kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4492: Make the streams cache eviction policy tolerable to reentrant puts
Date Wed, 07 Dec 2016 20:15:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 febfa1169 -> dce2e3f59


KAFKA-4492: Make the streams cache eviction policy tolerable to reentrant puts

The NamedCache wasn't correctly dealing with its re-entrant nature. This would result in the
LRU becoming corrupted, and the above exception occurring during eviction. For example:
Cache A: dirty key 1
eviction runs on Cache A
Node for key 1 gets marked as clean
Entry for key 1 gets flushed downstream
Downstream there is a processor that also refers to the table fronted by Cache A
Downstream processor puts key 2 into Cache A
This triggers an eviction of key 1 again ( it is still the oldest node as hasn't been removed
from the LRU)
As the Node for key 1 is clean flush doesn't run and it is immediately removed from the cache.
So now we have dirtyKey set with key =1, but the value doesn't exist in the cache.
Downstream processor tries to put key = 1 into the cache, it fails as key =1 is in the dirtyKeySet.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2226 from dguy/cache-bug


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dce2e3f5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dce2e3f5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dce2e3f5

Branch: refs/heads/0.10.1
Commit: dce2e3f593aba3195ceef5ec506b2182d51505eb
Parents: febfa11
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Dec 7 12:08:34 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Dec 7 12:14:43 2016 -0800

----------------------------------------------------------------------
 .../streams/state/internals/NamedCache.java     | 55 ++++++++++++---
 .../streams/state/internals/ThreadCache.java    | 13 +++-
 .../internals/KTableKTableLeftJoinTest.java     | 74 +++++++++++++++++++-
 .../streams/state/internals/NamedCacheTest.java | 62 ++++++++++++++++
 .../state/internals/ThreadCacheTest.java        | 34 +++++++++
 5 files changed, 224 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dce2e3f5/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index ab771df..07968a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -92,10 +92,14 @@ class NamedCache {
     }
 
     synchronized void flush() {
+        flush(null);
+    }
+
+    private void flush(final LRUNode evicted) {
         numFlushes++;
 
         log.debug("Named cache {} stats on flush: #hits={}, #misses={}, #overwrites={}, #flushes={}",
-            name, hits(), misses(), overwrites(), flushes());
+                  name, hits(), misses(), overwrites(), flushes());
 
         if (listener == null) {
             throw new IllegalArgumentException("No listener for namespace " + name + " registered
with cache");
@@ -106,6 +110,14 @@ class NamedCache {
         }
 
         final List<ThreadCache.DirtyEntry> entries  = new ArrayList<>();
+
+        // evicted already been removed from the cache so add it to the list of
+        // flushed entries and remove from dirtyKeys.
+        if (evicted != null) {
+            entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value, evicted.entry));
+            dirtyKeys.remove(evicted.key);
+        }
+
         for (Bytes key : dirtyKeys) {
             final LRUNode node = getInternal(key);
             if (node == null) {
@@ -114,11 +126,14 @@ class NamedCache {
             entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry));
             node.entry.markClean();
         }
-        listener.apply(entries);
+        // clear dirtyKeys before the listener is applied as it may be re-entrant.
         dirtyKeys.clear();
+        listener.apply(entries);
     }
 
 
+
+
     synchronized void put(final Bytes key, final LRUCacheEntry value) {
         if (!value.isDirty && dirtyKeys.contains(key)) {
             throw new IllegalStateException(String.format("Attempting to put a clean entry
for key [%s] " +
@@ -201,11 +216,11 @@ class NamedCache {
         }
         final LRUNode eldest = tail;
         currentSizeBytes -= eldest.size();
-        if (eldest.entry.isDirty()) {
-            flush();
-        }
         remove(eldest);
         cache.remove(eldest.key);
+        if (eldest.entry.isDirty()) {
+            flush(eldest);
+        }
     }
 
     synchronized LRUCacheEntry putIfAbsent(final Bytes key, final LRUCacheEntry value) {
@@ -269,6 +284,14 @@ class NamedCache {
         return tail.entry;
     }
 
+    synchronized LRUNode head() {
+        return head;
+    }
+
+    synchronized LRUNode tail() {
+        return tail;
+    }
+
     synchronized long dirtySize() {
         return dirtyKeys.size();
     }
@@ -276,7 +299,7 @@ class NamedCache {
     /**
      * A simple wrapper class to implement a doubly-linked list around MemoryLRUCacheBytesEntry
      */
-    private class LRUNode {
+    class LRUNode {
         private final Bytes key;
         private LRUCacheEntry entry;
         private LRUNode previous;
@@ -287,21 +310,33 @@ class NamedCache {
             this.entry = entry;
         }
 
-        public LRUCacheEntry entry() {
+        LRUCacheEntry entry() {
             return entry;
         }
 
-        public void update(LRUCacheEntry entry) {
-            this.entry = entry;
+        Bytes key() {
+            return key;
         }
 
-        public long size() {
+        long size() {
             return  key.get().length +
                     8 + // entry
                     8 + // previous
                     8 + // next
                     entry.size();
         }
+
+        LRUNode next() {
+            return next;
+        }
+
+        LRUNode previous() {
+            return previous;
+        }
+
+        private void update(LRUCacheEntry entry) {
+            this.entry = entry;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dce2e3f5/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index f7355d8..3d9d0b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -100,7 +100,7 @@ public class ThreadCache {
         cache.flush();
 
         log.debug("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
-            name, puts(), gets(), evicts(), flushes());
+                  name, puts(), gets(), evicts(), flushes());
     }
 
     public LRUCacheEntry get(final String namespace, byte[] key) {
@@ -115,7 +115,6 @@ public class ThreadCache {
 
     public void put(final String namespace, byte[] key, LRUCacheEntry value) {
         numPuts++;
-
         final NamedCache cache = getOrCreateCache(namespace);
         cache.put(Bytes.wrap(key), value);
         maybeEvict(namespace);
@@ -195,8 +194,15 @@ public class ThreadCache {
     private void maybeEvict(final String namespace) {
         while (sizeBytes() > maxCacheSizeBytes) {
             final NamedCache cache = getOrCreateCache(namespace);
+            // we abort here as the put on this cache may have triggered
+            // a put on another cache. So even though the sizeInBytes() is
+            // still > maxCacheSizeBytes there is nothing to evict from this
+            // namespaced cache.
+            if (cache.size() == 0) {
+                return;
+            }
+            log.trace("Thread {} evicting cache {}", name, namespace);
             cache.evict();
-
             numEvicts++;
         }
     }
@@ -323,4 +329,5 @@ public class ThreadCache {
         }
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dce2e3f5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 5f84678..98b49ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -19,11 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -35,6 +38,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Locale;
+import java.util.Random;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -322,6 +327,73 @@ public class KTableKTableLeftJoinTest {
         proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)",
"2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
+    /**
+     * This test was written to reproduce https://issues.apache.org/jira/browse/KAFKA-4492
+     * It is based on a fairly complicated join used by the developer that reported the bug.
+     * Before the fix this would trigger an IllegalStateException.
+     */
+    @Test
+    public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() throws Exception
{
+        final String agg = "agg";
+        final String tableOne = "tableOne";
+        final String tableTwo = "tableTwo";
+        final String tableThree = "tableThree";
+        final String tableFour = "tableFour";
+        final String tableFive = "tableFive";
+        final String tableSix = "tableSix";
+        final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive,
tableSix};
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(),
agg, agg)
+                .groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>()
{
+                    @Override
+                    public KeyValue<Long, String> apply(final Long key, final String
value) {
+                        return new KeyValue<>(key, value);
+                    }
+                }, Serdes.Long(), Serdes.String()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER,
"agg-store");
+
+        final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(),
tableOne, tableOne);
+        final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(),
tableTwo, tableTwo);
+        final KTable<Long, String> three = builder.table(Serdes.Long(), Serdes.String(),
tableThree, tableThree);
+        final KTable<Long, String> four = builder.table(Serdes.Long(), Serdes.String(),
tableFour, tableFour);
+        final KTable<Long, String> five = builder.table(Serdes.Long(), Serdes.String(),
tableFive, tableFive);
+        final KTable<Long, String> six = builder.table(Serdes.Long(), Serdes.String(),
tableSix, tableSix);
+
+        final ValueMapper<String, String> mapper = new ValueMapper<String, String>()
{
+            @Override
+            public String apply(final String value) {
+                return value.toUpperCase(Locale.ROOT);
+            }
+        };
+        final KTable<Long, String> seven = one.mapValues(mapper);
+
+
+        final KTable<Long, String> eight = six.leftJoin(seven, MockValueJoiner.STRING_JOINER);
+
+        aggTable.leftJoin(one, MockValueJoiner.STRING_JOINER)
+                .leftJoin(two, MockValueJoiner.STRING_JOINER)
+                .leftJoin(three, MockValueJoiner.STRING_JOINER)
+                .leftJoin(four, MockValueJoiner.STRING_JOINER)
+                .leftJoin(five, MockValueJoiner.STRING_JOINER)
+                .leftJoin(eight, MockValueJoiner.STRING_JOINER)
+                .mapValues(mapper);
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, 250);
+
+        final String[] values = {"a", "AA", "BBB", "CCCC", "DD", "EEEEEEEE", "F", "GGGGGGGGGGGGGGG",
"HHH", "IIIIIIIIII",
+                                 "J", "KK", "LLLL", "MMMMMMMMMMMMMMMMMMMMMM", "NNNNN", "O",
"P", "QQQQQ", "R", "SSSS",
+                                 "T", "UU", "VVVVVVVVVVVVVVVVVVV"};
+
+        final Random random = new Random();
+        for (int i = 0; i < 1000; i++) {
+            for (String input : inputs) {
+                final Long key = Long.valueOf(random.nextInt(1000));
+                final String value = values[random.nextInt(values.length)];
+                driver.process(input, key, value);
+            }
+        }
+    }
+
     private KeyValue<Integer, String> kv(Integer key, String value) {
         return new KeyValue<>(key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dce2e3f5/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 5c0d511..0a782d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 
 public class NamedCacheTest {
 
@@ -197,4 +198,65 @@ public class NamedCacheTest {
         cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, true, 0, 0,
0, ""));
         cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, false, 0,
0, 0, ""));
     }
+
+    @Test
+    public void shouldBeReentrantAndNotBreakLRU() throws Exception {
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, "");
+        final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
+        cache.put(Bytes.wrap(new byte[]{0}), dirty);
+        cache.put(Bytes.wrap(new byte[]{1}), clean);
+        cache.put(Bytes.wrap(new byte[]{2}), clean);
+        assertEquals(3 * cache.head().size(), cache.sizeInBytes());
+        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                cache.put(Bytes.wrap(new byte[]{3}), clean);
+                // evict key 1
+                cache.evict();
+                // evict key 2
+                cache.evict();
+            }
+        });
+
+        assertEquals(3 * cache.head().size(), cache.sizeInBytes());
+        // Evict key 0
+        cache.evict();
+        final Bytes entryFour = Bytes.wrap(new byte[]{4});
+        cache.put(entryFour, dirty);
+
+        // check that the LRU is still correct
+        final NamedCache.LRUNode head = cache.head();
+        final NamedCache.LRUNode tail = cache.tail();
+        assertEquals(2, cache.size());
+        assertEquals(2 * head.size(), cache.sizeInBytes());
+        // dirty should be the newest
+        assertEquals(entryFour, head.key());
+        assertEquals(Bytes.wrap(new byte[] {3}), tail.key());
+        assertSame(tail, head.next());
+        assertNull(head.previous());
+        assertSame(head, tail.previous());
+        assertNull(tail.next());
+
+        // evict key 3
+        cache.evict();
+        assertSame(cache.head(), cache.tail());
+        assertEquals(entryFour, cache.head().key());
+        assertNull(cache.head().next());
+        assertNull(cache.head().previous());
+    }
+
+    @Test
+    public void shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey()
throws Exception {
+        final LRUCacheEntry dirty = new LRUCacheEntry(new byte[]{3}, true, 0, 0, 0, "");
+        final LRUCacheEntry clean = new LRUCacheEntry(new byte[]{3});
+        final Bytes key = Bytes.wrap(new byte[] {3});
+        cache.setListener(new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                cache.put(key, clean);
+            }
+        });
+        cache.put(key, dirty);
+        cache.evict();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/dce2e3f5/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index b07da6e..1049b91 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -459,6 +459,40 @@ public class ThreadCacheTest {
         assertEquals(cache.evicts(), 3);
     }
 
+    @Test
+    public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() throws Exception
{
+        final int maxCacheSizeInBytes = 100;
+        final ThreadCache threadCache = new ThreadCache(maxCacheSizeInBytes);
+        // trigger a put into another cache on eviction from "name"
+        threadCache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener()
{
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                // put an item into an empty cache when the total cache size
+                // is already > than maxCacheSizeBytes
+                threadCache.put("other", new byte[]{0}, dirtyEntry(new byte[2]));
+            }
+        });
+        threadCache.addDirtyEntryFlushListener("other", new ThreadCache.DirtyEntryFlushListener()
{
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+               //
+            }
+        });
+        threadCache.addDirtyEntryFlushListener("another", new ThreadCache.DirtyEntryFlushListener()
{
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+
+            }
+        });
+
+        threadCache.put("another", new byte[]{1}, dirtyEntry(new byte[1]));
+        threadCache.put("name", new byte[]{1}, dirtyEntry(new byte[1]));
+        // Put a large item such that when the eldest item is removed
+        // cache sizeInBytes() > maxCacheSizeBytes
+        int remaining = (int) (maxCacheSizeInBytes - threadCache.sizeBytes());
+        threadCache.put("name", new byte[]{2}, dirtyEntry(new byte[remaining + 100]));
+    }
+
     private LRUCacheEntry dirtyEntry(final byte[] key) {
         return new LRUCacheEntry(key, true, -1, -1, -1, "");
     }


Mime
View raw message