kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6412 Improve synchronization in CachingKeyValueStore methods
Date Wed, 10 Jan 2018 10:24:32 GMT
This is an automated email from the ASF dual-hosted git repository.

damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 874eeb8  KAFKA-6412 Improve synchronization in CachingKeyValueStore methods
874eeb8 is described below

commit 874eeb88d9dc5e8fbcd681672fb90a4cd7597fec
Author: tedyu <yuzhihong@gmail.com>
AuthorDate: Wed Jan 10 10:21:51 2018 +0000

    KAFKA-6412 Improve synchronization in CachingKeyValueStore methods
    
    Currently CachingKeyValueStore methods are synchronized at method level.
    
    It seems we can use read lock for getter and write lock for put / delete methods.
    
    For getInternal(), if the underlying thread is streamThread, the getInternal() may trigger
eviction. This can be handled by obtaining write lock at the beginning of the method for streamThread.
    
    The jmh patch is attached to JIRA:
    https://issues.apache.org/jira/secure/attachment/12905140/6412-jmh.v1.txt
    
    Author: tedyu <yuzhihong@gmail.com>
    
    Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>
    
    Closes #4372 from tedyu/6412
---
 .../state/internals/CachingKeyValueStore.java      | 97 ++++++++++++++++------
 1 file changed, 71 insertions(+), 26 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index f0669a4..9fff8cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -31,6 +31,9 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements
KeyValueStore<Bytes, byte[]>, CachedStateStore<K, V> {
 
@@ -44,6 +47,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore
im
     private InternalProcessorContext context;
     private StateSerdes<K, V> serdes;
     private Thread streamThread;
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
 
     CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying,
                          final Serde<K> keySerde,
@@ -108,9 +112,14 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore
im
     }
 
     @Override
-    public synchronized void flush() {
-        cache.flush(cacheName);
-        underlying.flush();
+    public void flush() {
+        lock.writeLock().lock();
+        try {
+            cache.flush(cacheName);
+            underlying.flush();
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
     @Override
@@ -131,10 +140,21 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore
im
     }
 
     @Override
-    public synchronized byte[] get(final Bytes key) {
+    public byte[] get(final Bytes key) {
         validateStoreOpen();
-        Objects.requireNonNull(key);
-        return getInternal(key);
+        Lock theLock;
+        if (Thread.currentThread().equals(streamThread)) {
+            theLock = lock.writeLock();
+        } else {
+            theLock = lock.readLock();
+        }
+        theLock.lock();
+        try {
+            Objects.requireNonNull(key);
+            return getInternal(key);
+        } finally {
+            theLock.unlock();
+        }
     }
 
     private byte[] getInternal(final Bytes key) {
@@ -176,50 +196,75 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore
im
     }
 
     @Override
-    public synchronized long approximateNumEntries() {
+    public long approximateNumEntries() {
         validateStoreOpen();
-        return underlying.approximateNumEntries();
+        lock.readLock().lock();
+        try {
+            return underlying.approximateNumEntries();
+        } finally {
+            lock.readLock().unlock();
+        }
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value) {
+    public void put(final Bytes key, final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
-        putInternal(key, value);
+        lock.writeLock().lock();
+        try {
+            putInternal(key, value);
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
-    private synchronized void putInternal(final Bytes rawKey, final byte[] value) {
+    private void putInternal(final Bytes rawKey, final byte[] value) {
         Objects.requireNonNull(rawKey, "key cannot be null");
         cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, context.offset(),
-                  context.timestamp(), context.partition(), context.topic()));
+              context.timestamp(), context.partition(), context.topic()));
     }
 
     @Override
-    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
-        final byte[] v = getInternal(key);
-        if (v == null) {
-            putInternal(key, value);
+        lock.writeLock().lock();
+        try {
+            final byte[] v = getInternal(key);
+            if (v == null) {
+                putInternal(key, value);
+            }
+            return v;
+        } finally {
+            lock.writeLock().unlock();
         }
-        return v;
     }
 
     @Override
-    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries)
{
-        for (KeyValue<Bytes, byte[]> entry : entries) {
-            put(entry.key, entry.value);
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        lock.writeLock().lock();
+        try {
+            for (KeyValue<Bytes, byte[]> entry : entries) {
+                put(entry.key, entry.value);
+            }
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 
     @Override
-    public synchronized byte[] delete(final Bytes key) {
+    public byte[] delete(final Bytes key) {
         validateStoreOpen();
-        Objects.requireNonNull(key);
-        final byte[] v = getInternal(key);
-        cache.delete(cacheName, key);
-        underlying.delete(key);
-        return v;
+        lock.writeLock().lock();
+        try {
+            Objects.requireNonNull(key);
+            final byte[] v = getInternal(key);
+            cache.delete(cacheName, key);
+            underlying.delete(key);
+            return v;
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
 
     KeyValueStore<Bytes, byte[]> underlying() {

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message