kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4516: When a CachingStateStore is closed it should clear its associated NamedCache
Date Tue, 13 Dec 2016 02:00:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk efeaf1298 -> 21d7e6f19


KAFKA-4516: When a CachingStateStore is closed it should clear its associated NamedCache

Clear and remove the NamedCache from the ThreadCache when a CachingKeyValueStore or CachingWindowStore
is closed.
Validate that the store is open when doing any queries or writes to Caching State Stores.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2235 from dguy/kafka-4516


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21d7e6f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21d7e6f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21d7e6f1

Branch: refs/heads/trunk
Commit: 21d7e6f19bd36a7ad16291294fc933f9abfac9b7
Parents: efeaf12
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Dec 12 18:00:00 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Dec 12 18:00:00 2016 -0800

----------------------------------------------------------------------
 .../state/internals/CachingKeyValueStore.java   | 15 +++++
 .../state/internals/CachingWindowStore.java     | 10 ++++
 .../streams/state/internals/NamedCache.java     |  8 +++
 .../streams/state/internals/ThreadCache.java    |  9 +++
 .../internals/CachingKeyValueStoreTest.java     | 62 +++++++++++++++++++-
 .../state/internals/CachingWindowStoreTest.java | 22 +++++++
 .../state/internals/ThreadCacheTest.java        | 11 ++++
 7 files changed, 135 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index ab050b6..cfe6bd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -115,6 +116,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K,
V>, CachedStateStor
     public void close() {
         flush();
         underlying.close();
+        cache.close(name);
     }
 
     @Override
@@ -129,10 +131,17 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K,
V>, CachedStateStor
 
     @Override
     public synchronized V get(final K key) {
+        validateStoreOpen();
         final byte[] rawKey = serdes.rawKey(key);
         return get(rawKey);
     }
 
+    private void validateStoreOpen() {
+        if (!isOpen()) {
+            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
+        }
+    }
+
     private V get(final byte[] rawKey) {
         final LRUCacheEntry entry = cache.get(name, rawKey);
         if (entry == null) {
@@ -157,6 +166,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K,
V>, CachedStateStor
 
     @Override
     public KeyValueIterator<K, V> range(final K from, final K to) {
+        validateStoreOpen();
         final byte[] origFrom = serdes.rawKey(from);
         final byte[] origTo = serdes.rawKey(to);
         final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(underlying.range(Bytes.wrap(origFrom),
Bytes.wrap(origTo)));
@@ -166,6 +176,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K,
V>, CachedStateStor
 
     @Override
     public KeyValueIterator<K, V> all() {
+        validateStoreOpen();
         final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(underlying.all());
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name);
         return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator,
serdes);
@@ -173,11 +184,13 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K,
V>, CachedStateStor
 
     @Override
     public synchronized long approximateNumEntries() {
+        validateStoreOpen();
         return underlying.approximateNumEntries();
     }
 
     @Override
     public synchronized void put(final K key, final V value) {
+        validateStoreOpen();
         put(serdes.rawKey(key), value);
     }
 
@@ -189,6 +202,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K,
V>, CachedStateStor
 
     @Override
     public synchronized V putIfAbsent(final K key, final V value) {
+        validateStoreOpen();
         final byte[] rawKey = serdes.rawKey(key);
         final V v = get(rawKey);
         if (v == null) {
@@ -206,6 +220,7 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K,
V>, CachedStateStor
 
     @Override
     public synchronized V delete(final K key) {
+        validateStoreOpen();
         final byte[] rawKey = serdes.rawKey(key);
         final V v = get(rawKey);
         put(rawKey, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 304a206..71856fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -123,6 +124,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>,
CachedStateStore<Wi
     public void close() {
         flush();
         underlying.close();
+        cache.close(name);
     }
 
     @Override
@@ -143,6 +145,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>,
CachedStateStore<Wi
 
     @Override
     public synchronized void put(final K key, final V value, final long timestamp) {
+        validateStoreOpen();
         final byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       timestamp, context.partition(), context.topic());
@@ -151,6 +154,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>,
CachedStateStore<Wi
 
     @Override
     public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFrom,
final long timeTo) {
+        validateStoreOpen();
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
         byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, 0, serdes);
 
@@ -167,4 +171,10 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>,
CachedStateStore<Wi
             return serdes.valueFrom(iterator.next().value);
         }
     }
+
+    private void validateStoreOpen() {
+        if (!isOpen()) {
+            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 07968a9..4272f2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -296,6 +296,14 @@ class NamedCache {
         return dirtyKeys.size();
     }
 
+    synchronized void close() {
+        head = tail = null;
+        listener = null;
+        currentSizeBytes = 0;
+        dirtyKeys.clear();
+        cache.clear();
+    }
+
     /**
      * A simple wrapper class to implement a doubly-linked list around MemoryLRUCacheBytesEntry
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 3d9d0b8..c6c3030 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -49,6 +49,8 @@ public class ThreadCache {
     private long numEvicts = 0;
     private long numFlushes = 0;
 
+
+
     public interface DirtyEntryFlushListener {
         void apply(final List<DirtyEntry> dirty);
     }
@@ -191,6 +193,13 @@ public class ThreadCache {
         return sizeInBytes;
     }
 
+    synchronized void close(final String namespace) {
+        final NamedCache removed = caches.remove(namespace);
+        if (removed != null) {
+            removed.close();
+        }
+    }
+
     private void maybeEvict(final String namespace) {
         while (sizeBytes() > maxCacheSizeBytes) {
             final NamedCache cache = getOrCreateCache(namespace);

http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 23f8a6a..60eed96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -30,6 +32,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,10 +45,10 @@ import static org.junit.Assert.assertNull;
 
 public class CachingKeyValueStoreTest {
 
+    private final int maxCacheSizeBytes = 150;
     private CachingKeyValueStore<String, String> store;
     private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
     private ThreadCache cache;
-    private int maxCacheSizeBytes;
     private CacheFlushListenerStub<String> cacheFlushListener;
     private String topic;
 
@@ -56,7 +59,6 @@ public class CachingKeyValueStoreTest {
         cacheFlushListener = new CacheFlushListenerStub<>();
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
         store.setFlushListener(cacheFlushListener);
-        maxCacheSizeBytes = 150;
         cache = new ThreadCache(maxCacheSizeBytes);
         final MockProcessorContext context = new MockProcessorContext(null, null, null, null,
(RecordCollector) null, cache);
         topic = "topic";
@@ -149,6 +151,62 @@ public class CachingKeyValueStoreTest {
         assertFalse(store.all().hasNext());
     }
 
+    @Test
+    public void shouldClearNamespaceCacheOnClose() throws Exception {
+        store.put("a", "a");
+        assertEquals(1, cache.size());
+        store.close();
+        assertEquals(0, cache.size());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToGetFromClosedCachingStore() throws Exception {
+        store.close();
+        store.get("a");
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
+        store.close();
+        store.put("a", "a");
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() throws Exception
{
+        store.close();
+        store.range("a", "b");
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToDoAllQueryOnClosedCachingStore() throws Exception {
+        store.close();
+        store.all();
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToDoGetApproxSizeOnClosedCachingStore() throws Exception
{
+        store.close();
+        store.approximateNumEntries();
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToDoPutAllClosedCachingStore() throws Exception {
+        store.close();
+        store.putAll(Collections.singletonList(KeyValue.pair("a", "a")));
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToDoPutIfAbsentClosedCachingStore() throws Exception {
+        store.close();
+        store.putIfAbsent("b", "c");
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToDeleteFromClosedCachingStore() throws Exception {
+        store.close();
+        store.delete("key");
+    }
+
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;
         int i = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 49e2db3..023fea6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -150,6 +151,27 @@ public class CachingWindowStoreTest {
         assertFalse(fetch.hasNext());
     }
 
+    @Test
+    public void shouldClearNamespaceCacheOnClose() throws Exception {
+        cachingStore.put("a", "a");
+        assertEquals(1, cache.size());
+        cachingStore.close();
+        assertEquals(0, cache.size());
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToFetchFromClosedCachingStore() throws Exception {
+        cachingStore.close();
+        cachingStore.fetch("a", 0, 10);
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowIfTryingToWriteToClosedCachingStore() throws Exception {
+        cachingStore.close();
+        cachingStore.put("a", "a");
+    }
+
+
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;
         int i = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/21d7e6f1/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 1049b91..6c446ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -493,6 +493,17 @@ public class ThreadCacheTest {
         threadCache.put("name", new byte[]{2}, dirtyEntry(new byte[remaining + 100]));
     }
 
+    @Test
+    public void shouldCleanupNamedCacheOnClose() throws Exception {
+        final ThreadCache cache = new ThreadCache(100000);
+        cache.put("one", new byte[]{1}, cleanEntry(new byte[] {1}));
+        cache.put("two", new byte[]{1}, cleanEntry(new byte[] {1}));
+        assertEquals(cache.size(), 2);
+        cache.close("two");
+        assertEquals(cache.size(), 1);
+        assertNull(cache.get("two", new byte[] {1}));
+    }
+
     private LRUCacheEntry dirtyEntry(final byte[] key) {
         return new LRUCacheEntry(key, true, -1, -1, -1, "");
     }


Mime
View raw message