This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new d02db92 MINOR: ChangelogReader should poll for duration 0 for standby restore (#8773) d02db92 is described below commit d02db92ba06e46d7608f002732baf1a8ac0cd141 Author: Rohan AuthorDate: Mon Jun 1 22:33:22 2020 -0700 MINOR: ChangelogReader should poll for duration 0 for standby restore (#8773) Co-authored-by: Guozhang Wang Reviewers: Guozhang Wang --- .../kafka/clients/consumer/MockConsumer.java | 9 +++++- .../processor/internals/StoreChangelogReader.java | 8 +++--- .../internals/StoreChangelogReaderTest.java | 32 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index a7fc1e8..7bf4c3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -65,6 +65,7 @@ public class MockConsumer implements Consumer { private KafkaException pollException; private KafkaException offsetsException; private AtomicBoolean wakeup; + private Duration lastPollTimeout; private boolean closed; private boolean shouldRebalance; @@ -157,13 +158,15 @@ public class MockConsumer implements Consumer { @Deprecated @Override public synchronized ConsumerRecords poll(long timeout) { - return poll(Duration.ZERO); + return poll(Duration.ofMillis(timeout)); } @Override public synchronized ConsumerRecords poll(final Duration timeout) { ensureNotClosed(); + lastPollTimeout = timeout; + // Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in // the callback synchronized (pollTasks) { @@ -556,6 +559,10 @@ public class MockConsumer implements Consumer { shouldRebalance = false; } + public Duration lastPollTimeout() { + return lastPollTimeout; + } + @Override public void close(Duration timeout) { close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index c712bd3..6c6ff39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -216,9 +216,6 @@ public class StoreChangelogReader implements ChangelogReader { this.restoreConsumer = restoreConsumer; this.stateRestoreListener = stateRestoreListener; - // NOTE for restoring active and updating standby we may prefer different poll time - // in order to make sure we call the main consumer#poll in time. - // TODO: once both of these are moved to a separate thread this may no longer be a concern this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); this.updateOffsetIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ? DEFAULT_OFFSET_UPDATE_MS : config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); @@ -415,7 +412,10 @@ public class StoreChangelogReader implements ChangelogReader { final ConsumerRecords polledRecords; try { - polledRecords = restoreConsumer.poll(pollTime); + // for restoring active and updating standby we may prefer different poll time + // in order to make sure we call the main consumer#poll in time. + // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern + polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime); } catch (final InvalidOffsetException e) { log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " + "the consumer's position has fallen out of the topic partition offset range because the topic was " + diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index e873e04..6956e79 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -46,6 +46,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -227,6 +228,37 @@ public class StoreChangelogReaderTest extends EasyMockSupport { } @Test + public void shouldPollWithRightTimeout() { + EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); + EasyMock.replay(stateManager, storeMetadata, store); + + final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + @Override + public Map endOffsets(final Collection partitions) { + return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L)); + } + }; + consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L)); + + final StoreChangelogReader changelogReader = + new StoreChangelogReader(time, config, logContext, consumer, callback); + + changelogReader.register(tp, stateManager); + + if (type == STANDBY) { + changelogReader.transitToUpdateStandby(); + } + + changelogReader.restore(); + + if (type == ACTIVE) { + assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); + } else { + assertEquals(Duration.ZERO, consumer.lastPollTimeout()); + } + } + + @Test public void shouldRestoreFromPositionAndCheckForCompletion() { EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes(); EasyMock.replay(stateManager, storeMetadata, store);