kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5733: RocksDB bulk load with lower number of levels
Date Thu, 17 Aug 2017 21:02:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 72eacbea5 -> efe4f6540


KAFKA-5733: RocksDB bulk load with lower number of levels

This is to complete Bill's PR #3664 on KAFKA-5733, incorporating the suggestion in https://github.com/facebook/rocksdb/issues/2734.

Some minor changes: move `open = true` in `openDB`.

Author: Bill Bejeck <bill@confluent.io>
Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes #3681 from guozhangwang/K5733-rocksdb-bulk-load


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efe4f654
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efe4f654
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efe4f654

Branch: refs/heads/trunk
Commit: efe4f6540a3c26371c88b3f956bbd4abfe9dc52e
Parents: 72eacbe
Author: Bill Bejeck <bill@confluent.io>
Authored: Thu Aug 17 14:02:10 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Aug 17 14:02:10 2017 -0700

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   | 37 +++++++++--
 .../kafka/streams/state/internals/Segment.java  |  2 -
 .../state/internals/RocksDBStoreTest.java       | 66 +++++++++++++++-----
 3 files changed, 83 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/efe4f654/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index f8e9002..f142431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -47,6 +47,7 @@ import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Collection;
@@ -172,11 +173,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
             valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
+
         try {
             this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
         } catch (IOException e) {
             throw new ProcessorStateException(e);
         }
+
+        open = true;
     }
 
     public void init(ProcessorContext context, StateStore root) {
@@ -188,8 +192,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
         context.register(root, false, this.batchingStateRestoreCallback);
-
-        open = true;
     }
 
     private RocksDB openDB(File dir, Options options, int ttl) throws IOException {
@@ -254,10 +256,34 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
     }
 
     private void toggleDbForBulkLoading(boolean prepareForBulkload) {
+
+        if (prepareForBulkload) {
+            // if the store is not empty, we need to compact to get around the num.levels
check
+            // for bulk loading
+            final String[] sstFileNames = dbDir.list(new FilenameFilter() {
+                @Override
+                public boolean accept(File dir, String name) {
+                    return name.matches(".*\\.sst");
+                }
+            });
+
+            if (sstFileNames != null && sstFileNames.length > 0) {
+                try {
+                    this.db.compactRange(true, 1, 0);
+                } catch (RocksDBException e) {
+                    throw new ProcessorStateException("Error while range compacting during
restoring  store " + this.name, e);
+                }
+
+                // we need to re-open with the old num.levels again, this is a workaround
+                // until https://github.com/facebook/rocksdb/pull/2740 is merged in rocksdb
+                close();
+                openDB(internalProcessorContext);
+            }
+        }
+
         close();
         this.prepareForBulkload = prepareForBulkload;
         openDB(internalProcessorContext);
-        open = true;
     }
 
     @SuppressWarnings("unchecked")
@@ -435,7 +461,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
     }
 
     private void closeOpenIterators() {
-        HashSet<KeyValueIterator> iterators = null;
+        HashSet<KeyValueIterator> iterators;
         synchronized (openIterators) {
             iterators = new HashSet<>(openIterators);
         }
@@ -530,7 +556,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         }
     }
 
-    private static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback
{
+    // not private for testing
+    static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback
{
 
         private final RocksDBStore rocksDBStore;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/efe4f654/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 1311a27..b9cc8b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -40,8 +40,6 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
         super.openDB(context);
 
         // skip the registering step
-
-        open = true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/efe4f654/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 553a134..148762d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -38,6 +37,7 @@ import org.rocksdb.Options;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -76,6 +76,27 @@ public class RocksDBStoreTest {
     }
 
     @Test
+    public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws
Exception {
+        subject.init(context, subject);
+
+        final String message = "how can a 4 ounce bird carry a 2lb coconut";
+        int intKey = 1;
+        for (int i = 0; i < 2000000; i++) {
+            subject.put("theKeyIs" + intKey++, message);
+        }
+
+        final List<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<>();
+
+        final byte[] restoredKey = "restoredKey".getBytes("UTF-8");
+        final byte[] restoredValue = "restoredValue".getBytes("UTF-8");
+        restoreBytes.add(KeyValue.pair(restoredKey, restoredValue));
+
+        context.restore("test", restoreBytes);
+
+        assertThat(subject.get("restoredKey"), equalTo("restoredValue"));
+    }
+
+    @Test
     public void canSpecifyConfigSetterAsClass() throws Exception {
         final Map<String, Object> configs = new HashMap<>();
         configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
@@ -129,7 +150,8 @@ public class RocksDBStoreTest {
     @Test
     public void shouldTogglePrepareForBulkloadSetting() {
         subject.init(context, subject);
-        StateRestoreListener restoreListener = (StateRestoreListener) subject.batchingStateRestoreCallback;
+        RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
+            (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback;
 
         restoreListener.onRestoreStart(null, null, 0, 0);
         assertTrue("Should have set bulk loading to true", subject.isPrepareForBulkload());
@@ -139,11 +161,25 @@ public class RocksDBStoreTest {
     }
 
     @Test
+    public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception
{
+        final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
+
+        subject.init(context, subject);
+        context.restore(subject.name(), entries);
+
+        RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
+            (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback;
+
+        restoreListener.onRestoreStart(null, null, 0, 0);
+        assertTrue("Should have not set bulk loading to true", subject.isPrepareForBulkload());
+
+        restoreListener.onRestoreEnd(null, null, 0);
+        assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload());
+    }
+
+    @Test
     public void shouldRestoreAll() throws Exception {
-        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
-        entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
-        entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
-        entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
+        final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
         subject.init(context, subject);
         context.restore(subject.name(), entries);
@@ -156,10 +192,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldHandleDeletesOnRestoreAll() throws Exception {
-        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
-        entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
-        entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
-        entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
+        final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
         entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
 
         subject.init(context, subject);
@@ -205,11 +238,7 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldRestoreThenDeleteOnRestoreAll() throws Exception {
-
-        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
-        entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
-        entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
-        entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
+        final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
         subject.init(context, subject);
         
@@ -301,6 +330,13 @@ public class RocksDBStoreTest {
         }
     }
 
+    private List<KeyValue<byte[], byte[]>> getKeyValueEntries() throws UnsupportedEncodingException
{
+        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
+        return entries;
+    }
 
     private static class ConfigurableProcessorContext extends MockProcessorContext {
         final Map<String, Object> configs;


Mime
View raw message