kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7912: Support concurrent access in InMemoryKeyValueStore (#6336)
Date Thu, 28 Feb 2019 22:47:38 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0913623  KAFKA-7912: Support concurrent access in InMemoryKeyValueStore (#6336)
0913623 is described below

commit 09136235db2fcae5282a218de75b6356f9420031
Author: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
AuthorDate: Thu Feb 28 14:47:27 2019 -0800

    KAFKA-7912: Support concurrent access in InMemoryKeyValueStore (#6336)
    
    Previously the InMemoryKeyValue store would throw a ConcurrentModificationException if
the store was modified beneath an open iterator. The TreeMap implementation was swapped with
a ConcurrentSkipListMap for similar performance while supporting concurrent access.
    
    Added one test to AbstractKeyValueStoreTest, no existing tests caught this.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../state/internals/InMemoryKeyValueStore.java     | 29 +++++++++++-----------
 .../state/internals/AbstractKeyValueStoreTest.java | 11 ++++++++
 2 files changed, 25 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index cc28d64..b37c39e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,18 +28,16 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Iterator;
 import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 
 public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     private final String name;
-    private final NavigableMap<Bytes, byte[]> map;
+    private final ConcurrentNavigableMap<Bytes, byte[]> map;
     private volatile boolean open = false;
 
     public InMemoryKeyValueStore(final String name) {
         this.name = name;
 
-        this.map = new TreeMap<>();
+        this.map = new ConcurrentSkipListMap<>();
     }
 
     @Override
@@ -46,7 +46,6 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]>
{
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context,
                      final StateStore root) {
 
@@ -76,12 +75,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes,
byte[]> {
     }
 
     @Override
-    public synchronized byte[] get(final Bytes key) {
+    public byte[] get(final Bytes key) {
         return this.map.get(key);
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value) {
+    public void put(final Bytes key, final byte[] value) {
         if (value == null) {
             this.map.remove(key);
         } else {
@@ -90,7 +89,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]>
{
     }
 
     @Override
-    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
         final byte[] originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
@@ -99,29 +98,29 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes,
byte[]> {
     }
 
     @Override
-    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries)
{
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         for (final KeyValue<Bytes, byte[]> entry : entries) {
             put(entry.key, entry.value);
         }
     }
 
     @Override
-    public synchronized byte[] delete(final Bytes key) {
+    public byte[] delete(final Bytes key) {
         return this.map.remove(key);
     }
 
     @Override
-    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
-                                                              final Bytes to) {
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to)
{
         return new DelegatingPeekingKeyValueIterator<>(
             name,
             new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator()));
     }
 
     @Override
-    public synchronized KeyValueIterator<Bytes, byte[]> all() {
-        final TreeMap<Bytes, byte[]> copy = new TreeMap<>(this.map);
-        return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator(copy.entrySet().iterator()));
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return new DelegatingPeekingKeyValueIterator<>(
+            name,
+            new InMemoryKeyValueIterator(this.map.entrySet().iterator()));
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index f2f6b88..09a3736 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -378,4 +378,15 @@ public abstract class AbstractKeyValueStoreTest {
         store.delete(2);
         assertNull(store.get(2));
     }
+
+    @Test
+    public void shouldNotThrowConcurrentModificationException() {
+        store.put(0, "zero");
+
+        final KeyValueIterator<Integer, String> results = store.range(0, 2);
+
+        store.put(1, "one");
+
+        assertEquals(new KeyValue<>(0, "zero"), results.next());
+    }
 }


Mime
View raw message