kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10185: Restoration info logging (#8896)
Date Fri, 19 Jun 2020 23:30:59 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 2f59188  KAFKA-10185: Restoration info logging (#8896)
2f59188 is described below

commit 2f59188d35df40fba8da24a36b46469301debdf6
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Jun 19 14:17:58 2020 -0500

    KAFKA-10185: Restoration info logging (#8896)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/errors/TaskCorruptedException.java     |  6 ++
 .../processor/internals/StoreChangelogReader.java  | 81 ++++++++++++++++++----
 .../internals/StoreChangelogReaderTest.java        | 19 ++++-
 3 files changed, 89 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 770ba70..52f668b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.errors;
 
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.TaskId;
 
@@ -36,7 +37,12 @@ public class TaskCorruptedException extends StreamsException {
 
     public TaskCorruptedException(final Map<TaskId, Collection<TopicPartition>>
taskWithChangelogs) {
         super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs
to be re-initialized");
+        this.taskWithChangelogs = taskWithChangelogs;
+    }
 
+    public TaskCorruptedException(final Map<TaskId, Collection<TopicPartition>>
taskWithChangelogs,
+                                  final InvalidOffsetException e) {
+        super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs
to be re-initialized", e);
         this.taskWithChangelogs = taskWithChangelogs;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index c6ee4d3..8c2d557 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -63,6 +63,8 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchComm
  * be completed, while standby tasks updating changelog would always be in restoring state
after being initialized.
  */
 public class StoreChangelogReader implements ChangelogReader {
+    private static final long RESTORE_LOG_INTERVAL_MS = 10_000L;
+    private long lastRestoreLogTime = 0L;
 
     enum ChangelogState {
         // registered but need to be initialized (i.e. set its starting, end, limit offsets)
@@ -291,6 +293,7 @@ public class StoreChangelogReader implements ChangelogReader {
     public void enforceRestoreActive() {
         if (state != ChangelogReaderState.ACTIVE_RESTORING) {
             log.debug("Transiting to restore active tasks: {}", changelogs);
+            lastRestoreLogTime = 0L;
 
             // pause all partitions that are for standby tasks from the restore consumer
             pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
@@ -427,19 +430,20 @@ public class StoreChangelogReader implements ChangelogReader {
                 // for restoring active and updating standby we may prefer different poll
time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer
be a concern
-                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING)
? Duration.ZERO : pollTime);
+                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING
? Duration.ZERO : pollTime);
             } catch (final InvalidOffsetException e) {
-                log.warn("Encountered {} fetching records from restore consumer for partitions
{}, it is likely that " +
+                log.warn("Encountered " + e.getClass().getName() +
+                    " fetching records from restore consumer for partitions " + e.partitions()
+ ", it is likely that " +
                     "the consumer's position has fallen out of the topic partition offset
range because the topic was " +
                     "truncated or compacted on the broker, marking the corresponding tasks
as corrupted and re-initializing" +
-                    " it later.", e.getClass().getName(), e.partitions());
+                    " it later.", e);
 
                 final Map<TaskId, Collection<TopicPartition>> taskWithCorruptedChangelogs
= new HashMap<>();
                 for (final TopicPartition partition : e.partitions()) {
                     final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                     taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition);
                 }
-                throw new TaskCorruptedException(taskWithCorruptedChangelogs);
+                throw new TaskCorruptedException(taskWithCorruptedChangelogs, e);
             } catch (final KafkaException e) {
                 throw new StreamsException("Restore consumer get unexpected error polling
records.", e);
             }
@@ -448,7 +452,7 @@ public class StoreChangelogReader implements ChangelogReader {
                 bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition));
             }
 
-            for (final TopicPartition partition: restoringChangelogs) {
+            for (final TopicPartition partition : restoringChangelogs) {
                 // even if some partition do not have any accumulated data, we still trigger
                 // restoring since some changelog may not need to restore any at all, and
the
                 // restore to end check needs to be executed still.
@@ -458,9 +462,48 @@ public class StoreChangelogReader implements ChangelogReader {
             }
 
             maybeUpdateLimitOffsetsForStandbyChangelogs();
+
+            maybeLogRestorationProgress();
+        }
+    }
+
+    private void maybeLogRestorationProgress() {
+        if (state == ChangelogReaderState.ACTIVE_RESTORING) {
+            if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
+                final Set<TopicPartition> topicPartitions = activeRestoringChangelogs();
+                if (!topicPartitions.isEmpty()) {
+                    final StringBuilder builder = new StringBuilder().append("Restoration
in progress for ")
+                                                                     .append(topicPartitions.size())
+                                                                     .append(" partitions.");
+                    for (final TopicPartition partition : topicPartitions) {
+                        final ChangelogMetadata changelogMetadata = restoringChangelogByPartition(partition);
+                        builder.append(" {")
+                               .append(partition)
+                               .append(": ")
+                               .append("position=")
+                               .append(getPositionString(partition, changelogMetadata))
+                               .append(", end=")
+                               .append(changelogMetadata.restoreEndOffset)
+                               .append(", totalRestored=")
+                               .append(changelogMetadata.totalRestored)
+                               .append("}");
+                    }
+                    log.info(builder.toString());
+                    lastRestoreLogTime = time.milliseconds();
+                }
+            }
+        } else {
+            lastRestoreLogTime = 0L;
         }
     }
 
+    private static String getPositionString(final TopicPartition partition,
+                                            final ChangelogMetadata changelogMetadata) {
+        final ProcessorStateManager stateManager = changelogMetadata.stateManager;
+        final Long offsets = stateManager.changelogOffsets().get(partition);
+        return offsets == null ? "unknown" : String.valueOf(offsets);
+    }
+
     private void maybeUpdateLimitOffsetsForStandbyChangelogs() {
         // we only consider updating the limit offset for standbys if we are not restoring
active tasks
         if (state == ChangelogReaderState.STANDBY_UPDATING &&
@@ -496,8 +539,9 @@ public class StoreChangelogReader implements ChangelogReader {
             } else {
                 changelogMetadata.bufferedRecords.add(record);
                 final long offset = record.offset();
-                if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset)
+                if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset)
{
                     changelogMetadata.bufferedLimitIndex = changelogMetadata.bufferedRecords.size();
+                }
             }
         }
     }
@@ -572,8 +616,9 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private Map<TopicPartition, Long> endOffsetForChangelogs(final Set<TopicPartition>
partitions) {
-        if (partitions.isEmpty())
+        if (partitions.isEmpty()) {
             return Collections.emptyMap();
+        }
 
         try {
             final ListOffsetsResult result = adminClient.listOffsets(
@@ -617,8 +662,9 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void initializeChangelogs(final Set<ChangelogMetadata> newPartitionsToRestore)
{
-        if (newPartitionsToRestore.isEmpty())
+        if (newPartitionsToRestore.isEmpty()) {
             return;
+        }
 
         // for active changelogs, we need to find their end offset before transit to restoring
         // if the changelog is on source topic, then its end offset should be the minimum
of
@@ -631,11 +677,13 @@ public class StoreChangelogReader implements ChangelogReader {
             final TopicPartition partition = metadata.storeMetadata.changelogPartition();
 
             // TODO K9113: when TaskType.GLOBAL is added we need to modify this
-            if (metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
+            if (metadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
                 newPartitionsToFindEndOffset.add(partition);
+            }
 
-            if (metadata.stateManager.changelogAsSource(partition))
+            if (metadata.stateManager.changelogAsSource(partition)) {
                 newPartitionsToFindCommittedOffset.add(partition);
+            }
         }
 
         // NOTE we assume that all requested partitions will be included in the returned
map for both end/committed
@@ -643,33 +691,36 @@ public class StoreChangelogReader implements ChangelogReader {
         final Map<TopicPartition, Long> endOffsets = endOffsetForChangelogs(newPartitionsToFindEndOffset);
         final Map<TopicPartition, Long> committedOffsets = committedOffsetForChangelogs(newPartitionsToFindCommittedOffset);
 
-        for (final TopicPartition partition: newPartitionsToFindEndOffset) {
+        for (final TopicPartition partition : newPartitionsToFindEndOffset) {
             final ChangelogMetadata changelogMetadata = changelogs.get(partition);
             final Long endOffset = endOffsets.get(partition);
             final Long committedOffset = newPartitionsToFindCommittedOffset.contains(partition)
?
                 committedOffsets.get(partition) : Long.valueOf(Long.MAX_VALUE);
 
             if (endOffset != null && committedOffset != null) {
-                if (changelogMetadata.restoreEndOffset != null)
+                if (changelogMetadata.restoreEndOffset != null) {
                     throw new IllegalStateException("End offset for " + partition +
                         " should only be initialized once. Existing value: " + changelogMetadata.restoreEndOffset
+
                         ", new value: (" + endOffset + ", " + committedOffset + ")");
+                }
 
                 changelogMetadata.restoreEndOffset = Math.min(endOffset, committedOffset);
 
                 log.debug("End offset for changelog {} initialized as {}.", partition, changelogMetadata.restoreEndOffset);
             } else {
-                if (!newPartitionsToRestore.remove(changelogMetadata))
+                if (!newPartitionsToRestore.remove(changelogMetadata)) {
                     throw new IllegalStateException("New changelogs to restore " + newPartitionsToRestore
+
                         " does not contain the one looking for end offset: " + partition
+ ", this should not happen.");
+                }
 
                 log.info("End offset for changelog {} cannot be found; will retry in the
next time.", partition);
             }
         }
 
         // try initialize limit offsets for standby tasks for the first time
-        if (!committedOffsets.isEmpty())
+        if (!committedOffsets.isEmpty()) {
             updateLimitOffsetsForStandbyChangelogs(committedOffsets);
+        }
 
         // add new partitions to the restore consumer and transit them to restoring state
         addChangelogsToRestoreConsumer(newPartitionsToRestore.stream().map(metadata ->
metadata.storeMetadata.changelogPartition())
@@ -743,7 +794,7 @@ public class StoreChangelogReader implements ChangelogReader {
         // separate those who do not have the current offset loaded from checkpoint
         final Set<TopicPartition> newPartitionsWithoutStartOffset = new HashSet<>();
 
-        for (final ChangelogMetadata changelogMetadata: newPartitionsToRestore) {
+        for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
             final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
             final TopicPartition partition = storeMetadata.changelogPartition();
             final Long currentOffset = storeMetadata.offset();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ad16cff..d82a4d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -59,11 +59,14 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.streams.processor.internals.Task.TaskType.ACTIVE;
-import static org.apache.kafka.streams.processor.internals.Task.TaskType.STANDBY;
 import static org.apache.kafka.streams.processor.internals.StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING;
 import static org.apache.kafka.streams.processor.internals.StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING;
+import static org.apache.kafka.streams.processor.internals.Task.TaskType.ACTIVE;
+import static org.apache.kafka.streams.processor.internals.Task.TaskType.STANDBY;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
@@ -223,6 +226,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     @Test
     public void shouldPollWithRightTimeout() {
         EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
         EasyMock.replay(stateManager, storeMetadata, store);
 
         consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
@@ -249,6 +253,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     @Test
     public void shouldRestoreFromPositionAndCheckForCompletion() {
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
         EasyMock.replay(stateManager, storeMetadata, store);
 
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@@ -315,6 +320,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     @Test
     public void shouldRestoreFromBeginningAndCheckCompletion() {
         EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
         EasyMock.replay(stateManager, storeMetadata, store);
 
         consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
@@ -411,6 +417,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     @Test
     public void shouldRequestPositionAndHandleTimeoutException() {
         EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
+        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp,
10L));
         EasyMock.replay(activeStateManager, storeMetadata, store);
 
         final AtomicBoolean clearException = new AtomicBoolean(false);
@@ -472,6 +479,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     @Test
     public void shouldRequestEndOffsetsAndHandleTimeoutException() {
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
+        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp,
5L));
         EasyMock.replay(activeStateManager, storeMetadata, store);
 
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
@@ -541,6 +549,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
         EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes();
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
         EasyMock.replay(stateManager, storeMetadata, store);
 
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
@@ -849,6 +858,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes();
         EasyMock.expect(activeStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
         EasyMock.expect(activeStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
+        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(mkMap(
+            mkEntry(tp, 5L),
+            mkEntry(tp1, 5L),
+            mkEntry(tp2, 5L)
+        )).anyTimes();
         EasyMock.replay(activeStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
 
         setupConsumer(10, tp);
@@ -889,6 +903,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes();
         EasyMock.expect(standbyStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
         EasyMock.expect(standbyStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
+        EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp,
5L));
         EasyMock.replay(activeStateManager, standbyStateManager, storeMetadata, store, storeMetadataOne,
storeMetadataTwo);
 
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));


Mime
View raw message