This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.5 by this push: new 17f9f3a KAFKA-10221: Backport fix for KAFKA-9603 to 2.5 (#8987) 17f9f3a is described below commit 17f9f3ae56a1049f035841a36cdc70a33ba91779 Author: Bruno Cadonna AuthorDate: Wed Jul 8 04:52:55 2020 +0200 KAFKA-10221: Backport fix for KAFKA-9603 to 2.5 (#8987) KAFKA-9603 reports that the number of open files keeps increasing in RocksDB. The reason is that bulk loading is turned on but never turned off in segmented state stores for standby tasks. This bug was fixed in 2.6 through PR #8661 by using code that is not present in 2.5. So cherry-picking was not possible. Reviewers: John Roesler --- .../processor/internals/StandbyContextImpl.java | 2 +- .../AbstractRocksDBSegmentedBytesStore.java | 7 ++- .../AbstractRocksDBSegmentedBytesStoreTest.java | 52 +++++++++++++++++++--- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 9a94ad6..2d0d3d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -29,7 +29,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import java.time.Duration; -class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { +public class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { StandbyContextImpl(final TaskId id, final StreamsConfig config, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 4c32104..dd0feec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallba import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -251,7 +252,7 @@ public class AbstractRocksDBSegmentedBytesStore implements Se // This handles the case that state store is moved to a new client and does not // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading // will only close the database and open it again with bulk loading enabled. - if (!bulkLoadSegments.contains(segment)) { + if (!bulkLoadSegments.contains(segment) && isStoreForActiveTask()) { segment.toggleDbForBulkLoading(true); // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that // makes the open flag for the newly created store. @@ -270,6 +271,10 @@ public class AbstractRocksDBSegmentedBytesStore implements Se return writeBatchMap; } + private boolean isStoreForActiveTask() { + return context instanceof ProcessorContextImpl; + } + private void toggleForBulkLoading(final boolean prepareForBulkload) { for (final S segment : segments.allSegments()) { segment.toggleDbForBulkLoading(prepareForBulkload); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 7c4f443..6a4b9c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -29,10 +29,16 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.StandbyContextImpl; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; import org.apache.kafka.test.StreamsTestUtils; @@ -64,6 +70,9 @@ import java.util.SimpleTimeZone; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.niceMock; +import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; @@ -355,7 +364,43 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest } @Test - public void shouldRestoreToByteStore() { + public void shouldRestoreToByteStoreForActiveTask() { + final ProcessorContextImpl context = niceMock(ProcessorContextImpl.class); + verifyRestoreToByteStore(context); + + // Bulk loading is enabled during recovery for active tasks + // (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824) + for (final S segment : bytesStore.getSegments()) { + assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30)); + } + } + + @Test + public void shouldRestoreToByteStoreForStandbyTask() { + final StandbyContextImpl context = niceMock(StandbyContextImpl.class); + verifyRestoreToByteStore(context); + + // Bulk loading is disabled during recovery for stand-by tasks + // (in bulk loading mode the compaction trigger for level 0 is set to 1 << 30, i.e. 1,073,741,824) + for (final S segment : bytesStore.getSegments()) { + assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4)); + } + } + + private void verifyRestoreToByteStore(final InternalProcessorContext context) { + bytesStore = getBytesStore(); + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST); + streamsMetrics.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger()); + expect(context.metrics()).andStubReturn(streamsMetrics); + final TaskId taskId = new TaskId(0, 0); + expect(context.taskId()).andStubReturn(taskId); + final Map config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals(); + expect(context.appConfigs()).andStubReturn(config); + expect(context.stateDir()).andStubReturn(stateDir); + replay(context); + bytesStore.init(context, bytesStore); + // 0 segments initially. assertEquals(0, bytesStore.getSegments().size()); final String key = "a"; @@ -367,11 +412,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest // 2 segments are created during restoration. assertEquals(2, bytesStore.getSegments().size()); - // Bulk loading is enabled during recovery. - for (final S segment : bytesStore.getSegments()) { - assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30)); - } - final List, Long>> expected = new ArrayList<>(); expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L)); expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));