kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [04/50] [abbrv] kafka git commit: HOTFIX: RocksDBStore must clear dirty flags after flush
Date Mon, 11 Apr 2016 23:09:20 GMT
HOTFIX: RocksDBStore must clear dirty flags after flush

guozhangwang
Without clearing the dirty flags, RocksDBStore will perform flush for every new record. This
bug made the store performance painfully slower.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1163 from ymatsuda/clear_dirty_flag


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5089f547
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5089f547
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5089f547

Branch: refs/heads/0.10.0
Commit: 5089f547d5d64a0235e1b4adc327a0cb05eb4ca8
Parents: 43d5078
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Mar 29 13:30:56 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Mar 29 13:30:56 2016 -0700

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   | 27 ++++++++++++--------
 1 file changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5089f547/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b206f37..fe327f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -165,7 +165,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
                         public void apply(K key, RocksDBCacheEntry entry) {
                             // flush all the dirty entries to RocksDB if this evicted entry
is dirty
                             if (entry.isDirty) {
-                                flush();
+                                flushCache();
                             }
                         }
                     });
@@ -226,7 +226,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
             RocksDBCacheEntry entry = cache.get(key);
 
             if (entry == null) {
-                byte[] rawKey = serdes.rawKey(key);
                 V value = serdes.valueFrom(getInternal(serdes.rawKey(key)));
                 cache.put(key, new RocksDBCacheEntry(value));
 
@@ -251,8 +250,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
     @Override
     public void put(K key, V value) {
         if (cache != null) {
-            cache.put(key, new RocksDBCacheEntry(value, true));
             cacheDirtyKeys.add(key);
+            cache.put(key, new RocksDBCacheEntry(value, true));
         } else {
             byte[] rawKey = serdes.rawKey(key);
             byte[] rawValue = serdes.rawValue(value);
@@ -298,7 +297,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
             put(entry.key, entry.value);
     }
 
-    // this function is only called in flush()
+    // this function is only called in flushCache()
     private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
         WriteBatch batch = new WriteBatch();
 
@@ -324,7 +323,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
     public KeyValueIterator<K, V> range(K from, K to) {
         // we need to flush the cache if necessary before returning the iterator
         if (cache != null)
-            flush();
+            flushCache();
 
         return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
     }
@@ -333,15 +332,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
     public KeyValueIterator<K, V> all() {
         // we need to flush the cache if necessary before returning the iterator
         if (cache != null)
-            flush();
+            flushCache();
 
         RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
         return new RocksDbIterator<K, V>(innerIter, serdes);
     }
 
-    @Override
-    public void flush() {
+    private void flushCache() {
         // flush of the cache entries if necessary
         if (cache != null) {
             List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size());
@@ -350,7 +348,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
             for (K key : cacheDirtyKeys) {
                 RocksDBCacheEntry entry = cache.get(key);
 
-                assert entry.isDirty;
+                entry.isDirty = false;
 
                 byte[] rawKey = serdes.rawKey(key);
 
@@ -386,12 +384,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
             cacheDirtyKeys.clear();
         }
 
-        flushInternal();
-
         if (loggingEnabled)
             changeLogger.logChange(getter);
     }
 
+    @Override
+    public void flush() {
+        // flush of the cache entries if necessary
+        flushCache();
+
+        // flush RocksDB
+        flushInternal();
+    }
+
     public void flushInternal() {
         try {
             db.flush(fOptions);


Mime
View raw message