kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-10221: Backport fix for KAFKA-9603 to 2.5 (#8987)
Date Wed, 08 Jul 2020 02:53:44 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 17f9f3a  KAFKA-10221: Backport fix for KAFKA-9603 to 2.5 (#8987)
17f9f3a is described below

commit 17f9f3ae56a1049f035841a36cdc70a33ba91779
Author: Bruno Cadonna <bruno@confluent.io>
AuthorDate: Wed Jul 8 04:52:55 2020 +0200

    KAFKA-10221: Backport fix for KAFKA-9603 to 2.5 (#8987)
    
    KAFKA-9603 reports that the number of open files keeps increasing
    in RocksDB. The reason is that bulk loading is turned on but
    never turned off in segmented state stores for standby tasks.
    
    This bug was fixed in 2.6 through PR #8661 by using code
    that is not present in 2.5. So cherry-picking was not possible.
    
    Reviewers: John Roesler <vvcephei@apache.org>
---
 .../processor/internals/StandbyContextImpl.java    |  2 +-
 .../AbstractRocksDBSegmentedBytesStore.java        |  7 ++-
 .../AbstractRocksDBSegmentedBytesStoreTest.java    | 52 +++++++++++++++++++---
 3 files changed, 53 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 9a94ad6..2d0d3d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
 
-class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier
{
+public class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier
{
 
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 4c32104..dd0feec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallba
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -251,7 +252,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment>
implements Se
                 // This handles the case that state store is moved to a new client and does
not
                 // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
                 // will only close the database and open it again with bulk loading enabled.
-                if (!bulkLoadSegments.contains(segment)) {
+                if (!bulkLoadSegments.contains(segment) && isStoreForActiveTask())
{
                     segment.toggleDbForBulkLoading(true);
                     // If the store does not exist yet, the getOrCreateSegmentIfLive will
call openDB that
                     // makes the open flag for the newly created store.
@@ -270,6 +271,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment>
implements Se
         return writeBatchMap;
     }
 
+    private boolean isStoreForActiveTask() {
+        return context instanceof ProcessorContextImpl;
+    }
+
     private void toggleForBulkLoading(final boolean prepareForBulkload) {
         for (final S segment : segments.allSegments()) {
             segment.toggleDbForBulkLoading(prepareForBulkload);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 7c4f443..6a4b9c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -29,10 +29,16 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.StandbyContextImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -64,6 +70,9 @@ import java.util.SimpleTimeZone;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.niceMock;
+import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -355,7 +364,43 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends
Segment>
     }
 
     @Test
-    public void shouldRestoreToByteStore() {
+    public void shouldRestoreToByteStoreForActiveTask() {
+        final ProcessorContextImpl context = niceMock(ProcessorContextImpl.class);
+        verifyRestoreToByteStore(context);
+
+        // Bulk loading is enabled during recovery for active tasks
+        // (in bulk loading mode the compaction trigger for level 0 is set to 1 <<
30, i.e. 1,073,741,824)
+        for (final S segment : bytesStore.getSegments()) {
+            assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 <<
30));
+        }
+    }
+
+    @Test
+    public void shouldRestoreToByteStoreForStandbyTask() {
+        final StandbyContextImpl context = niceMock(StandbyContextImpl.class);
+        verifyRestoreToByteStore(context);
+
+        // Bulk loading is disabled during recovery for stand-by tasks
+        // (in bulk loading mode the compaction trigger for level 0 is set to 1 <<
30, i.e. 1,073,741,824)
+        for (final S segment : bytesStore.getSegments()) {
+            assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4));
+        }
+    }
+
+    private void verifyRestoreToByteStore(final InternalProcessorContext context) {
+        bytesStore = getBytesStore();
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST);
+        streamsMetrics.setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
+        expect(context.metrics()).andStubReturn(streamsMetrics);
+        final TaskId taskId = new TaskId(0, 0);
+        expect(context.taskId()).andStubReturn(taskId);
+        final Map<String, Object> config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals();
+        expect(context.appConfigs()).andStubReturn(config);
+        expect(context.stateDir()).andStubReturn(stateDir);
+        replay(context);
+        bytesStore.init(context, bytesStore);
+
         // 0 segments initially.
         assertEquals(0, bytesStore.getSegments().size());
         final String key = "a";
@@ -367,11 +412,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends
Segment>
         // 2 segments are created during restoration.
         assertEquals(2, bytesStore.getSegments().size());
 
-        // Bulk loading is enabled during recovery.
-        for (final S segment : bytesStore.getSegments()) {
-            assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 <<
30));
-        }
-
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
         expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));


Mime
View raw message