kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-8040: Streams handle initTransactions timeout (#6416)
Date Mon, 11 Mar 2019 22:18:32 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new af7fabc  KAFKA-8040: Streams handle initTransactions timeout (#6416)
af7fabc is described below

commit af7fabc5a9302708ea277e4252a7382f19ac9a11
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Mon Mar 11 17:18:11 2019 -0500

    KAFKA-8040: Streams handle initTransactions timeout (#6416)
    
    https://issues.apache.org/jira/browse/KAFKA-7934
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
---
 .../processor/internals/RecordCollectorImpl.java   |  19 ++-
 .../streams/processor/internals/StreamTask.java    |  24 +++-
 .../processor/internals/StreamTaskTest.java        | 157 +++++++++++++++++++--
 .../internals/testutil/LogCaptureAppender.java     |  52 ++++++-
 4 files changed, 230 insertions(+), 22 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 554cc85..e483f58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -204,11 +204,20 @@ public class RecordCollectorImpl implements RecordCollector {
                 }
             });
         } catch (final TimeoutException e) {
-            log.error("Timeout exception caught when sending record to topic {}. " +
-                "This might happen if the producer cannot send data to the Kafka cluster
and thus, " +
-                "its internal buffer fills up. " +
-                "You can increase producer parameter `max.block.ms` to increase this timeout.",
topic);
-            throw new StreamsException(String.format("%sFailed to send record to topic %s
due to timeout.", logPrefix, topic));
+            log.error(
+                "Timeout exception caught when sending record to topic {}. " +
+                    "This might happen if the producer cannot send data to the Kafka cluster
and thus, " +
+                    "its internal buffer fills up. " +
+                    "This can also happen if the broker is slow to respond, if the network
connection to " +
+                    "the broker was interrupted, or if similar circumstances arise. " +
+                    "You can increase producer parameter `max.block.ms` to increase this
timeout.",
+                topic,
+                e
+            );
+            throw new StreamsException(
+                String.format("%sFailed to send record to topic %s due to timeout.", logPrefix,
topic),
+                e
+            );
         } catch (final Exception uncaughtException) {
             if (uncaughtException instanceof KafkaException &&
                 uncaughtException.getCause() instanceof ProducerFencedException) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ce8e3c1..a325b24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
@@ -222,7 +223,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         // initialize transactions if eos is turned on, which will block if the previous
transaction has not
         // completed yet; do not start the first transaction until the topology has been
initialized later
         if (eosEnabled) {
-            this.producer.initTransactions();
+            initializeTransactions();
         }
     }
 
@@ -270,7 +271,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 throw new IllegalStateException("Task producer should be null.");
             }
             producer = producerSupplier.get();
-            producer.initTransactions();
+            initializeTransactions();
             recordCollector.init(producer);
         }
     }
@@ -796,4 +797,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     Producer<byte[], byte[]> getProducer() {
         return producer;
     }
+
+    private void initializeTransactions() {
+        try {
+            producer.initTransactions();
+        } catch (final TimeoutException retriable) {
+            log.error(
+                "Timeout exception caught when initializing transactions for task {}. " +
+                    "This might happen if the broker is slow to respond, if the network connection
to " +
+                    "the broker was interrupted, or if similar circumstances arise. " +
+                    "You can increase producer parameter `max.block.ms` to increase this
timeout.",
+                id,
+                retriable
+            );
+            throw new StreamsException(
+                format("%sFailed to initialize task %s due to timeout.", logPrefix, id),
+                retriable
+            );
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bacfcb7..2e61d7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -45,6 +46,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
@@ -60,17 +62,20 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
+import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -164,7 +169,7 @@ public class StreamTaskTest {
 
     @Before
     public void setup() {
-        consumer.assign(Arrays.asList(partition1, partition2));
+        consumer.assign(asList(partition1, partition2));
         stateDirectory = new StateDirectory(createConfig(false), new MockTime());
     }
 
@@ -183,18 +188,142 @@ public class StreamTaskTest {
         }
     }
 
+    @Test
+    public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final ProcessorTopology topology = ProcessorTopology.withSources(
+            asList(source1, source2, processorStreamTime, processorSystemTime),
+            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+        );
+
+        source1.addChild(processorStreamTime);
+        source2.addChild(processorStreamTime);
+        source1.addChild(processorSystemTime);
+        source2.addChild(processorSystemTime);
+
+        try {
+            new StreamTask(
+                taskId00,
+                partitions,
+                topology,
+                consumer,
+                changelogReader,
+                createConfig(true),
+                streamsMetrics,
+                stateDirectory,
+                null,
+                time,
+                () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer,
bytesSerializer) {
+                    @Override
+                    public void initTransactions() {
+                        throw new TimeoutException("test");
+                    }
+                }
+            );
+            fail("Expected an exception");
+        } catch (final StreamsException expected) {
+            // make sure we log the explanation as an ERROR
+            assertTimeoutErrorLog(appender);
+
+            // make sure we report the correct message
+            assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0
due to timeout."));
+
+            // make sure we preserve the cause
+            assertEquals(expected.getCause().getClass(), TimeoutException.class);
+            assertThat(expected.getCause().getMessage(), is("test"));
+        }
+        LogCaptureAppender.unregister(appender);
+    }
+
+    @Test
+    public void shouldHandleInitTransactionsTimeoutExceptionOnResume() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        final ProcessorTopology topology = ProcessorTopology.withSources(
+            asList(source1, source2, processorStreamTime, processorSystemTime),
+            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+        );
+
+        source1.addChild(processorStreamTime);
+        source2.addChild(processorStreamTime);
+        source1.addChild(processorSystemTime);
+        source2.addChild(processorSystemTime);
+
+        final AtomicBoolean timeOut = new AtomicBoolean(false);
+
+        final StreamTask testTask = new StreamTask(
+            taskId00,
+            partitions,
+            topology,
+            consumer,
+            changelogReader,
+            createConfig(true),
+            streamsMetrics,
+            stateDirectory,
+            null,
+            time,
+            () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer,
bytesSerializer) {
+                @Override
+                public void initTransactions() {
+                    if (timeOut.get()) {
+                        throw new TimeoutException("test");
+                    } else {
+                        super.initTransactions();
+                    }
+                }
+            }
+        );
+        testTask.initializeTopology();
+        testTask.suspend();
+        timeOut.set(true);
+        try {
+            testTask.resume();
+            fail("Expected an exception");
+        } catch (final StreamsException expected) {
+            // make sure we log the explanation as an ERROR
+            assertTimeoutErrorLog(appender);
+
+            // make sure we report the correct message
+            assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0
due to timeout."));
+
+            // make sure we preserve the cause
+            assertEquals(expected.getCause().getClass(), TimeoutException.class);
+            assertThat(expected.getCause().getMessage(), is("test"));
+        }
+        LogCaptureAppender.unregister(appender);
+    }
+
+    private void assertTimeoutErrorLog(final LogCaptureAppender appender) {
+
+        final String expectedErrorLogMessage =
+            "task [0_0] Timeout exception caught when initializing transactions for task
0_0. " +
+                "This might happen if the broker is slow to respond, if the network " +
+                "connection to the broker was interrupted, or if similar circumstances arise.
" +
+                "You can increase producer parameter `max.block.ms` to increase this timeout.";
+
+        final List<String> expectedError =
+            appender
+                .getEvents()
+                .stream()
+                .filter(event -> event.getMessage().equals(expectedErrorLogMessage))
+                .map(LogCaptureAppender.Event::getLevel)
+                .collect(Collectors.toList());
+        assertThat(expectedError, is(singletonList("ERROR")));
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testProcessOrder() {
         task = createStatelessTask(createConfig(false));
 
-        task.addRecords(partition1, Arrays.asList(
+        task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 10),
             getConsumerRecord(partition1, 20),
             getConsumerRecord(partition1, 30)
         ));
 
-        task.addRecords(partition2, Arrays.asList(
+        task.addRecords(partition2, asList(
             getConsumerRecord(partition2, 25),
             getConsumerRecord(partition2, 35),
             getConsumerRecord(partition2, 45)
@@ -259,12 +388,12 @@ public class StreamTaskTest {
     public void testPauseResume() {
         task = createStatelessTask(createConfig(false));
 
-        task.addRecords(partition1, Arrays.asList(
+        task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 10),
             getConsumerRecord(partition1, 20)
         ));
 
-        task.addRecords(partition2, Arrays.asList(
+        task.addRecords(partition2, asList(
             getConsumerRecord(partition2, 35),
             getConsumerRecord(partition2, 45),
             getConsumerRecord(partition2, 55),
@@ -278,7 +407,7 @@ public class StreamTaskTest {
         assertEquals(1, consumer.paused().size());
         assertTrue(consumer.paused().contains(partition2));
 
-        task.addRecords(partition1, Arrays.asList(
+        task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 30),
             getConsumerRecord(partition1, 40),
             getConsumerRecord(partition1, 50)
@@ -316,7 +445,7 @@ public class StreamTaskTest {
         task.initializeStateStores();
         task.initializeTopology();
 
-        task.addRecords(partition1, Arrays.asList(
+        task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 0),
             getConsumerRecord(partition1, 20),
             getConsumerRecord(partition1, 32),
@@ -324,7 +453,7 @@ public class StreamTaskTest {
             getConsumerRecord(partition1, 60)
         ));
 
-        task.addRecords(partition2, Arrays.asList(
+        task.addRecords(partition2, asList(
             getConsumerRecord(partition2, 25),
             getConsumerRecord(partition2, 35),
             getConsumerRecord(partition2, 45),
@@ -407,14 +536,14 @@ public class StreamTaskTest {
         task.initializeStateStores();
         task.initializeTopology();
 
-        task.addRecords(partition1, Arrays.asList(
+        task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 20),
             getConsumerRecord(partition1, 142),
             getConsumerRecord(partition1, 155),
             getConsumerRecord(partition1, 160)
         ));
 
-        task.addRecords(partition2, Arrays.asList(
+        task.addRecords(partition2, asList(
             getConsumerRecord(partition2, 25),
             getConsumerRecord(partition2, 145),
             getConsumerRecord(partition2, 159),
@@ -493,13 +622,13 @@ public class StreamTaskTest {
         task.initializeStateStores();
         task.initializeTopology();
 
-        task.addRecords(partition1, Arrays.asList(
+        task.addRecords(partition1, asList(
             getConsumerRecord(partition1, 20),
             getConsumerRecord(partition1, 30),
             getConsumerRecord(partition1, 40)
         ));
 
-        task.addRecords(partition2, Arrays.asList(
+        task.addRecords(partition2, asList(
             getConsumerRecord(partition2, 25),
             getConsumerRecord(partition2, 35),
             getConsumerRecord(partition2, 45)
@@ -1198,7 +1327,7 @@ public class StreamTaskTest {
             mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(repartition.topic(), (SourceNode)
source2)),
             Collections.singleton(repartition.topic())
         );
-        consumer.assign(Arrays.asList(partition1, repartition));
+        consumer.assign(asList(partition1, repartition));
 
         task = new StreamTask(
             taskId00,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
index b6f5769..462159f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
@@ -21,11 +21,37 @@ import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 
+import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 
 public class LogCaptureAppender extends AppenderSkeleton {
-    private final LinkedList<LoggingEvent> events = new LinkedList<>();
+    private final Deque<LoggingEvent> events = new LinkedList<>();
+
+    public static class Event {
+        private final String level;
+        private final String message;
+        private final Optional<String> throwableInfo;
+
+        Event(final String level, final String message, final Optional<String> throwableInfo)
{
+            this.level = level;
+            this.message = message;
+            this.throwableInfo = throwableInfo;
+        }
+
+        public String getLevel() {
+            return level;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public Optional<String> getThrowableInfo() {
+            return throwableInfo;
+        }
+    }
 
     public static LogCaptureAppender createAndRegister() {
         final LogCaptureAppender logCaptureAppender = new LogCaptureAppender();
@@ -54,6 +80,30 @@ public class LogCaptureAppender extends AppenderSkeleton {
         return result;
     }
 
+    public List<Event> getEvents() {
+        final LinkedList<Event> result = new LinkedList<>();
+        synchronized (events) {
+            for (final LoggingEvent event : events) {
+                final String[] throwableStrRep = event.getThrowableStrRep();
+                final Optional<String> throwableString;
+                if (throwableStrRep == null) {
+                    throwableString = Optional.empty();
+                } else {
+                    final StringBuilder throwableStringBuilder = new StringBuilder();
+
+                    for (final String s : throwableStrRep) {
+                        throwableStringBuilder.append(s);
+                    }
+
+                    throwableString = Optional.of(throwableStringBuilder.toString());
+                }
+
+                result.add(new Event(event.getLevel().toString(), event.getRenderedMessage(),
throwableString));
+            }
+        }
+        return result;
+    }
+
     @Override
     public void close() {
 


Mime
View raw message