From commits-return-10879-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Tue Dec 11 09:39:28 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9B64A18736 for ; Tue, 11 Dec 2018 09:39:28 +0000 (UTC) Received: (qmail 13291 invoked by uid 500); 11 Dec 2018 09:39:28 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 13242 invoked by uid 500); 11 Dec 2018 09:39:28 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 13217 invoked by uid 99); 11 Dec 2018 09:39:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Dec 2018 09:39:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 291C082F21; Tue, 11 Dec 2018 09:27:38 +0000 (UTC) Date: Tue, 11 Dec 2018 09:27:37 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 1.1 updated: KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154452045605.15094.6744362070267741173@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/1.1 X-Git-Reftype: branch X-Git-Oldrev: 0074414a42036370bae892ab784311a4c310ff5e X-Git-Newrev: 4cdbb3e5c19142d118f0f3999dd3e21deccb3643 X-Git-Rev: 4cdbb3e5c19142d118f0f3999dd3e21deccb3643 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 4cdbb3e KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946) 4cdbb3e is described below commit 4cdbb3e5c19142d118f0f3999dd3e21deccb3643 Author: linyli001 <45358531+linyli001@users.noreply.github.com> AuthorDate: Tue Dec 11 16:40:18 2018 +0800 KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946) Reviewer: Matthias J. Sax , John Roesler --- .../kafka/clients/consumer/MockConsumer.java | 4 +++ .../processor/internals/StoreChangelogReader.java | 3 ++ .../internals/StoreChangelogReaderTest.java | 36 ++++++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index ceb7024..07cb415 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -184,6 +184,10 @@ public class MockConsumer implements Consumer { if (!subscriptions.isPaused(entry.getKey())) { final List> recs = entry.getValue(); for (final ConsumerRecord rec : recs) { + if (beginningOffsets.get(entry.getKey()) != null && beginningOffsets.get(entry.getKey()) > subscriptions.position(entry.getKey())) { + throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), subscriptions.position(entry.getKey()))); + } + if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) { results.get(entry.getKey()).add(rec); subscriptions.position(entry.getKey(), rec.offset() + 1); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index e0fc82d..501ddef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -96,6 +96,9 @@ public class StoreChangelogReader implements ChangelogReader { for (final TopicPartition partition : partitions) { final StreamTask task = active.restoringTaskFor(partition); log.info("Reinitializing StreamTask {}", task); + + final StateRestorer restorer = stateRestorers.get(partition); + restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT); task.reinitializeStateStoresForPartitions(recoverableException.partitions()); } restoreConsumer.seekToBeginning(partitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index a117dc3..5391f31 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -152,6 +152,42 @@ public class StoreChangelogReaderTest { assertThat(callback.restored.size(), equalTo(messages)); } + @Test + public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() { + final int messages = 10; + final int startOffset = 5; + final long expiredCheckpoint = 1L; + assignPartition(messages, topicPartition); + consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, (long) startOffset)); + consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset))); + + addRecords(messages, topicPartition, startOffset); + consumer.assign(Collections.emptyList()); + + final StateRestorer stateRestorer = new StateRestorer( + topicPartition, + restoreListener, + expiredCheckpoint, + Long.MAX_VALUE, + true, + "storeName"); + changelogReader.register(stateRestorer); + + EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + EasyMock.replay(active, task); + + // first restore call "fails" since OffsetOutOfRangeException but we should not die with an exception + assertEquals(0, changelogReader.restore(active).size()); + //the starting offset for stateRestorer is set to NO_CHECKPOINT + assertThat(stateRestorer.checkpoint(), equalTo(-1L)); + + //restore the active task again + changelogReader.register(stateRestorer); + //the restored task should return completed partition without Exception. + assertEquals(1, changelogReader.restore(active).size()); + //the restored size should be equal to message length. + assertThat(callback.restored.size(), equalTo(messages)); + } @Test public void shouldRestoreMessagesFromCheckpoint() {