This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b492296 MINOR: fix checkpoint write failure warning log (#6008)
b492296 is described below
commit b49229675748afae584799e11a8b8f090c4218a9
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Sat Dec 8 21:00:57 2018 -0600
MINOR: fix checkpoint write failure warning log (#6008)
We saw a log statement in which the cause of the failure to write a checkpoint was not
properly logged.
This change logs the exception properly and also verifies the log message.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
.../processor/internals/ProcessorStateManager.java | 2 +-
.../internals/ProcessorStateManagerTest.java | 40 +++++++++++++++++
.../internals/testutil/LogCaptureAppender.java | 50 ++++++++++++++++++++++
3 files changed, 91 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3d0c664..d08779f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -292,7 +292,7 @@ public class ProcessorStateManager extends AbstractStateManager {
try {
checkpoint.write(this.checkpointableOffsets);
} catch (final IOException e) {
- log.warn("Failed to write offset checkpoint file to {}: {}", checkpoint, e);
+ log.warn("Failed to write offset checkpoint file to [{}]", checkpoint, e);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 3d91ee0..183f9ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockKeyValueStore;
@@ -51,7 +52,9 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singletonList;
+import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
@@ -564,6 +567,43 @@ public class ProcessorStateManagerTest {
}
}
+ // if the optional is absent, it'll throw an exception and fail the test.
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ @Test
+ public void shouldLogAWarningIfCheckpointThrowsAnIOException() {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+ final ProcessorStateManager stateMgr;
+ try {
+ stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
+ changelogReader,
+ false,
+ logContext);
+ } catch (final IOException e) {
+ e.printStackTrace();
+ throw new AssertionError(e);
+ }
+ stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
+
+ stateDirectory.clean();
+ stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
+ LogCaptureAppender.unregister(appender);
+
+ final List<LogCaptureAppender.Event> messages = appender.getEvents();
+
+ final LogCaptureAppender.Event lastEvent = messages.get(messages.size() - 1);
+
+ assertThat(lastEvent.getLevel(), is("WARN"));
+ assertThat(lastEvent.getMessage(), startsWith("process-state-manager-test Failed
to write offset checkpoint file to ["));
+ assertThat(lastEvent.getMessage(), endsWith(".checkpoint]"));
+ assertThat(lastEvent.getThrowableInfo().get(), startsWith("java.io.FileNotFoundException:
"));
+ }
+
@Test
public void shouldFlushAllStoresEvenIfStoreThrowsException() throws IOException {
final ProcessorStateManager stateManager = new ProcessorStateManager(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
index ffb8799..a1f7b31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
@@ -24,10 +24,36 @@ import org.apache.log4j.spi.LoggingEvent;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
public class LogCaptureAppender extends AppenderSkeleton {
private final LinkedList<LoggingEvent> events = new LinkedList<>();
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ public static class Event {
+ private String level;
+ private String message;
+ private Optional<String> throwableInfo;
+
+ Event(final String level, final String message, final Optional<String> throwableInfo)
{
+ this.level = level;
+ this.message = message;
+ this.throwableInfo = throwableInfo;
+ }
+
+ public String getLevel() {
+ return level;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public Optional<String> getThrowableInfo() {
+ return throwableInfo;
+ }
+ }
+
public static LogCaptureAppender createAndRegister() {
final LogCaptureAppender logCaptureAppender = new LogCaptureAppender();
Logger.getRootLogger().addAppender(logCaptureAppender);
@@ -59,6 +85,30 @@ public class LogCaptureAppender extends AppenderSkeleton {
return result;
}
+ public List<Event> getEvents() {
+ final LinkedList<Event> result = new LinkedList<>();
+ synchronized (events) {
+ for (final LoggingEvent event : events) {
+ final String[] throwableStrRep = event.getThrowableStrRep();
+ final Optional<String> throwableString;
+ if (throwableStrRep == null) {
+ throwableString = Optional.empty();
+ } else {
+ final StringBuilder throwableStringBuilder = new StringBuilder();
+
+ for (final String s : throwableStrRep) {
+ throwableStringBuilder.append(s);
+ }
+
+ throwableString = Optional.of(throwableStringBuilder.toString());
+ }
+
+ result.add(new Event(event.getLevel().toString(), event.getRenderedMessage(),
throwableString));
+ }
+ }
+ return result;
+ }
+
@Override
public void close() {
|