kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7103: Use bulkloading for RocksDBSegmentedBytesStore during init (#5276)
Date Tue, 17 Jul 2018 21:04:57 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08fe24b  KAFKA-7103: Use bulkloading for RocksDBSegmentedBytesStore during init (#5276)
08fe24b is described below

commit 08fe24b46a327424e3249eeef1a32e05db717b18
Author: Liquan Pei <liquanpei@gmail.com>
AuthorDate: Tue Jul 17 14:04:51 2018 -0700

    KAFKA-7103: Use bulkloading for RocksDBSegmentedBytesStore during init (#5276)
    
    This PR uses bulk loading for recovering RocksDBWindowStore, same as RocksDBStore.
    
    Reviewers: Boyang Chen <bchen11@outlook.com>, Shawn Nguyen <shnguyen@pinterest.com>,
Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang
Wang <wangguoz@gmail.com>
---
 .../internals/RocksDBSegmentedBytesStore.java      | 106 +++++++++++++++++++--
 .../streams/state/internals/RocksDBStore.java      |  14 ++-
 .../kafka/streams/state/internals/Segment.java     |   2 +
 .../internals/RocksDBSegmentedBytesStoreTest.java  |  66 +++++++++++++
 4 files changed, 173 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index c5d15d6..5f1ec37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -16,26 +16,36 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
-    private final static Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
-
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
     private final String name;
     private final Segments segments;
     private final KeySchema keySchema;
     private InternalProcessorContext context;
     private volatile boolean open;
+    private Set<Segment> bulkLoadSegments;
 
     RocksDBSegmentedBytesStore(final String name,
                                final long retention,
@@ -131,15 +141,11 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
 
         segments.openExisting(this.context);
 
+        bulkLoadSegments = new HashSet<>(segments.allSegments());
+
         // register and possibly restore the state from the logs
-        context.register(root, new StateRestoreCallback() {
-            @Override
-            public void restore(byte[] key, byte[] value) {
-                put(Bytes.wrap(key), value);
-            }
-        });
+        context.register(root, new RocksDBSegmentsBatchingRestoreCallback());
 
-        flush();
         open = true;
     }
 
@@ -164,4 +170,84 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
         return open;
     }
 
+    // Visible for testing
+    List<Segment> getSegments() {
+        return segments.allSegments();
+    }
+
+    // Visible for testing
+    void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records)
{
+        try {
+            final Map<Segment, WriteBatch> writeBatchMap = getWriteBatches(records);
+            for (final Map.Entry<Segment, WriteBatch> entry : writeBatchMap.entrySet())
{
+                final Segment segment = entry.getKey();
+                final WriteBatch batch = entry.getValue();
+                segment.write(batch);
+            }
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error restoring batch to store " + this.name,
e);
+        }
+    }
+
+    // Visible for testing
+    Map<Segment, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[],
byte[]>> records) {
+        final Map<Segment, WriteBatch> writeBatchMap = new HashMap<>();
+        for (final KeyValue<byte[], byte[]> record : records) {
+            final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key)));
+            final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
+            if (segment != null) {
+                // This handles the case that state store is moved to a new client and does
not
+                // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
+                // will only close the database and open it again with bulk loading enabled.
+                if (!bulkLoadSegments.contains(segment)) {
+                    segment.toggleDbForBulkLoading(true);
+                    // If the store does not exist yet, the getOrCreateSegmentIfLive will
call openDB that
+                    // makes the open flag for the newly created store.
+                    // if the store does exist already, then toggleDbForBulkLoading will
make sure that
+                    // the store is already open here.
+                    bulkLoadSegments = new HashSet<>(segments.allSegments());
+                }
+                try {
+                    final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s ->
new WriteBatch());
+                    if (record.value == null) {
+                        batch.remove(record.key);
+                    } else {
+                        batch.put(record.key, record.value);
+                    }
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error restoring batch to store " +
this.name, e);
+                }
+            }
+        }
+        return writeBatchMap;
+    }
+
+    private void toggleForBulkLoading(final boolean prepareForBulkload) {
+        for (final Segment segment: segments.allSegments()) {
+            segment.toggleDbForBulkLoading(prepareForBulkload);
+        }
+    }
+
+    private class RocksDBSegmentsBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback
{
+
+        @Override
+        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records)
{
+            restoreAllInternal(records);
+        }
+
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long startingOffset,
+                                   final long endingOffset) {
+            toggleForBulkLoading(true);
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long totalRestored) {
+            toggleForBulkLoading(false);
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 17d03cc..cb00747 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -90,7 +90,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
     private FlushOptions fOptions;
 
     private volatile boolean prepareForBulkload = false;
-    private ProcessorContext internalProcessorContext;
+    ProcessorContext internalProcessorContext;
     // visible for testing
     volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
 
@@ -230,7 +230,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         }
     }
 
-    private void toggleDbForBulkLoading(final boolean prepareForBulkload) {
+    void toggleDbForBulkLoading(final boolean prepareForBulkload) {
 
         if (prepareForBulkload) {
             // if the store is not empty, we need to compact to get around the num.levels
check
@@ -276,7 +276,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         return originalValue;
     }
 
-    private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>>
records) {
+    void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records)
{
         try (final WriteBatch batch = new WriteBatch()) {
             for (final KeyValue<byte[], byte[]> record : records) {
                 if (record.value == null) {
@@ -285,7 +285,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
                     batch.put(record.key, record.value);
                 }
             }
-            db.write(wOptions, batch);
+            write(batch);
         } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error restoring batch to store " + this.name,
e);
         }
@@ -310,6 +310,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         }
     }
 
+    void write(final WriteBatch batch) throws RocksDBException {
+        db.write(wOptions, batch);
+    }
+
     @Override
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         try (final WriteBatch batch = new WriteBatch()) {
@@ -321,7 +325,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
                     batch.put(entry.key.get(), entry.value);
                 }
             }
-            db.write(wOptions, batch);
+            write(batch);
         } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error while batch writing to store " + this.name,
e);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index dbe470e..d107812 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -39,10 +39,12 @@ class Segment extends RocksDBStore implements Comparable<Segment>
{
         return Long.compare(id, segment.id);
     }
 
+
     @Override
     public void openDB(final ProcessorContext context) {
         super.openDB(context);
         // skip the registering step
+        internalProcessorContext = context;
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 6b9e7a8..8e69ccb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -32,12 +33,14 @@ import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 import org.junit.runners.Parameterized.Parameter;
+import org.rocksdb.WriteBatch;
 
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 
@@ -46,10 +49,12 @@ import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SimpleTimeZone;
 
@@ -291,6 +296,67 @@ public class RocksDBSegmentedBytesStoreTest {
         bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
     }
 
+    @Test
+    public void shouldCreateWriteBatches() {
+        final String key = "a";
+        final Collection<KeyValue<byte[], byte[]>> records = new ArrayList<>();
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[0])).get(),
serializeValue(50L)));
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(),
serializeValue(100L)));
+        final Map<Segment, WriteBatch> writeBatchMap = bytesStore.getWriteBatches(records);
+        assertEquals(2, writeBatchMap.size());
+        for (final WriteBatch batch: writeBatchMap.values()) {
+            assertEquals(1, batch.count());
+        }
+    }
+
+    @Test
+    public void shouldRestoreToByteStore() {
+        // 0 segments initially.
+        assertEquals(0, bytesStore.getSegments().size());
+        final String key = "a";
+        final Collection<KeyValue<byte[], byte[]>> records = new ArrayList<>();
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[0])).get(),
serializeValue(50L)));
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(),
serializeValue(100L)));
+        bytesStore.restoreAllInternal(records);
+
+        // 2 segments are created during restoration.
+        assertEquals(2, bytesStore.getSegments().size());
+
+        // Bulk loading is enabled during recovery.
+        for (final Segment segment: bytesStore.getSegments()) {
+            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1
<< 30));
+        }
+
+        final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
+        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+        expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
+        assertEquals(expected, results);
+    }
+
+    @Test
+    public void shouldRespectBulkLoadOptionsDuringInit() {
+        bytesStore.init(context, bytesStore);
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
+        assertEquals(2, bytesStore.getSegments().size());
+
+        final StateRestoreListener restoreListener = context.getRestoreListener(bytesStore.name());
+
+        restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
+
+        for (final Segment segment: bytesStore.getSegments()) {
+            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1
<< 30));
+        }
+
+        restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
+        for (final Segment segment: bytesStore.getSegments()) {
+            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4));
+        }
+    }
+
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);
 


Mime
View raw message