kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7652: Part III; Put to underlying before Flush (#6191)
Date Wed, 13 Feb 2019 06:36:51 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a1c269  KAFKA-7652: Part III; Put to underlying before Flush (#6191)
0a1c269 is described below

commit 0a1c26934757afae4dce49ff3ee038311ca6dd4a
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Feb 12 22:36:40 2019 -0800

    KAFKA-7652: Part III; Put to underlying before Flush (#6191)
    
    1. In the caching layer's flush listener call, we should always write to the underlying
store, before flushing (see #4331 's point 4) for detailed explanation). When fixing 4331,
it only touches on KV stores, but it turns out that we should fix for window and session store
as well.
    
    2. Also apply the optimization that was in session-store already: when the new value bytes
and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by
a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this
mean the underlying store does not have this value at all and also no intermediate value has
been sent to downstream as well. We can skip both putting a null to the underlying store as
well as calling the flush li [...]
    
    Modifies corresponding unit tests.
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Bill Bejeck <bill@confluent.io>
---
 .../state/internals/CachingKeyValueStore.java      | 44 ++++++++------
 .../state/internals/CachingSessionStore.java       | 37 +++++++-----
 .../state/internals/CachingWindowStore.java        | 68 +++++++++++-----------
 .../streams/state/internals/SessionKeySchema.java  |  7 +++
 .../streams/state/internals/WindowKeySchema.java   | 11 +++-
 .../integration/QueryableStateIntegrationTest.java |  2 +-
 .../kstream/internals/KTableFilterTest.java        | 44 +++++++++-----
 .../state/internals/AbstractKeyValueStoreTest.java | 37 +++++++-----
 .../state/internals/CachingKeyValueStoreTest.java  | 57 +++++++++++++++---
 .../state/internals/CachingSessionStoreTest.java   | 63 ++++++++++----------
 .../state/internals/CachingWindowStoreTest.java    | 35 ++++++++++-
 .../state/internals/RocksDBSessionStoreTest.java   | 18 +-----
 .../org/apache/kafka/test/StreamsTestUtils.java    |  9 +++
 13 files changed, 274 insertions(+), 158 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 9ecae5a..992466c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -85,27 +85,33 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore
im
 
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
                                     final InternalProcessorContext context) {
-        final ProcessorRecordContext current = context.recordContext();
-        try {
-            context.setRecordContext(entry.entry().context());
-            if (flushListener != null) {
-                V oldValue = null;
-                if (sendOldValues) {
-                    final byte[] oldBytesValue = underlying.get(entry.key());
-                    oldValue = oldBytesValue == null ? null : serdes.valueFrom(oldBytesValue);
-                }
-                // we rely on underlying store to handle null new value bytes as deletes
-                underlying.put(entry.key(), entry.newValue());
-                flushListener.apply(
-                    serdes.keyFrom(entry.key().get()),
-                    serdes.valueFrom(entry.newValue()),
-                    oldValue,
-                    entry.entry().context().timestamp());
-            } else {
+        if (flushListener != null) {
+            final byte[] newValueBytes = entry.newValue();
+            final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? underlying.get(entry.key())
: null;
+
+            // this is an optimization: if this key did not exist in underlying store and
also not in the cache,
+            // we can skip flushing to downstream as well as writing to underlying store
+            if (newValueBytes != null || oldValueBytes != null) {
+                final K key = serdes.keyFrom(entry.key().get());
+                final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes)
: null;
+                final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes)
: null;
+                // we need to get the old values if needed, and then put to store, and then
flush
                 underlying.put(entry.key(), entry.newValue());
+
+                final ProcessorRecordContext current = context.recordContext();
+                context.setRecordContext(entry.entry().context());
+                try {
+                    flushListener.apply(
+                        key,
+                        newValue,
+                        oldValue,
+                        entry.entry().context().timestamp());
+                } finally {
+                    context.setRecordContext(current);
+                }
             }
-        } finally {
-            context.setRecordContext(current);
+        } else {
+            underlying.put(entry.key(), entry.newValue());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index a07f1ce..1c5c2f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -72,7 +72,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore
i
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);
 
-
         cacheName = context.taskId() + "-" + bytesStore.name();
         cache = context.getCache();
         cache.addDirtyEntryFlushListener(cacheName, entries -> {
@@ -178,27 +177,35 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore
i
 
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext
context) {
         final Bytes binaryKey = cacheFunction.key(entry.key());
-        final ProcessorRecordContext current = context.recordContext();
-        context.setRecordContext(entry.entry().context());
-        try {
-            final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(),
topic);
-            final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
-            if (flushListener != null) {
-                final AGG newValue = serdes.valueFrom(entry.newValue());
-                final AGG oldValue = newValue == null || sendOldValues ?
-                    serdes.valueFrom(bytesStore.fetchSession(rawKey, key.window().start(),
key.window().end())) :
-                    null;
-                if (!(newValue == null && oldValue == null)) {
+        final Windowed<Bytes> bytesKey = SessionKeySchema.from(binaryKey);
+        if (flushListener != null) {
+            final byte[] newValueBytes = entry.newValue();
+            final byte[] oldValueBytes = newValueBytes == null || sendOldValues ?
+                bytesStore.fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end())
: null;
+
+            // this is an optimization: if this key did not exist in underlying store and
also not in the cache,
+            // we can skip flushing to downstream as well as writing to underlying store
+            if (newValueBytes != null || oldValueBytes != null) {
+                final Windowed<K> key = SessionKeySchema.from(bytesKey, serdes.keyDeserializer(),
topic);
+                final AGG newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes)
: null;
+                final AGG oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes)
: null;
+                // we need to get the old values if needed, and then put to store, and then
flush
+                bytesStore.put(bytesKey, entry.newValue());
+
+                final ProcessorRecordContext current = context.recordContext();
+                context.setRecordContext(entry.entry().context());
+                try {
                     flushListener.apply(
                         key,
                         newValue,
                         oldValue,
                         entry.entry().context().timestamp());
+                } finally {
+                    context.setRecordContext(current);
                 }
             }
-            bytesStore.put(new Windowed<>(rawKey, key.window()), entry.newValue());
-        } finally {
-            context.setRecordContext(current);
+        } else {
+            bytesStore.put(bytesKey, entry.newValue());
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 6112544..53d02dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -64,13 +64,13 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        initInternal(context);
+        initInternal((InternalProcessorContext) context);
         underlying.init(context, root);
     }
 
     @SuppressWarnings("unchecked")
-    private void initInternal(final ProcessorContext context) {
-        this.context = (InternalProcessorContext) context;
+    private void initInternal(final InternalProcessorContext context) {
+        this.context = context;
         final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(),
underlying.name());
         serdes = new StateSerdes<>(topic,
                                    keySerde == null ? (Serde<K>) context.keySerde()
: keySerde,
@@ -84,34 +84,44 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
 
         cache.addDirtyEntryFlushListener(name, entries -> {
             for (final ThreadCache.DirtyEntry entry : entries) {
-                final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get();
-                final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey);
-
-                final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey,
windowSize, serdes.keyDeserializer(), serdes.topic());
-                final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey));
-                maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
-                underlying.put(key, entry.newValue(), timestamp);
+                putAndMaybeForward(entry, context);
             }
         });
     }
 
-    private void maybeForward(final ThreadCache.DirtyEntry entry,
-                              final Bytes key,
-                              final Windowed<K> windowedKey,
-                              final InternalProcessorContext context) {
+    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
+                                    final InternalProcessorContext context) {
+        final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get();
+        final Windowed<Bytes> windowedKeyBytes = WindowKeySchema.fromStoreBytesKey(binaryWindowKey,
windowSize);
+        final long windowStartTimestamp = windowedKeyBytes.window().start();
+        final Bytes key = windowedKeyBytes.key();
         if (flushListener != null) {
-            final ProcessorRecordContext current = context.recordContext();
-            context.setRecordContext(entry.entry().context());
-            try {
-                final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start())
: null;
-                flushListener.apply(
-                    windowedKey,
-                    serdes.valueFrom(entry.newValue()),
-                    oldValue,
-                    entry.entry().context().timestamp());
-            } finally {
-                context.setRecordContext(current);
+            final byte[] newValueBytes = entry.newValue();
+            final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? underlying.fetch(key,
windowStartTimestamp) : null;
+
+            // this is an optimization: if this key did not exist in underlying store and
also not in the cache,
+            // we can skip flushing to downstream as well as writing to underlying store
+            if (newValueBytes != null || oldValueBytes != null) {
+                final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(windowedKeyBytes,
serdes.keyDeserializer(), serdes.topic());
+                final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes)
: null;
+                final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes)
: null;
+                // we need to get the old values if needed, and then put to store, and then
flush
+                underlying.put(key, entry.newValue(), windowStartTimestamp);
+
+                final ProcessorRecordContext current = context.recordContext();
+                context.setRecordContext(entry.entry().context());
+                try {
+                    flushListener.apply(
+                        windowedKey,
+                        newValue,
+                        oldValue,
+                        entry.entry().context().timestamp());
+                } finally {
+                    context.setRecordContext(current);
+                }
             }
+        } else {
+            underlying.put(key, entry.newValue(), windowStartTimestamp);
         }
     }
 
@@ -231,14 +241,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
         );
     }
 
-    private V fetchPrevious(final Bytes key, final long timestamp) {
-        final byte[] value = underlying.fetch(key, timestamp);
-        if (value != null) {
-            return serdes.valueFrom(value);
-        }
-        return null;
-    }
-    
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         validateStoreOpen();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 94f08ae..0c80da4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -132,6 +132,13 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema
{
         return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), window);
     }
 
+    public static <K> Windowed<K> from(final Windowed<Bytes> keyBytes,
+                                       final Deserializer<K> keyDeserializer,
+                                       final String topic) {
+        final K key = keyDeserializer.deserialize(topic, keyBytes.key().get());
+        return new Windowed<>(key, keyBytes.window());
+    }
+
     public static <K> byte[] toBinary(final Windowed<K> sessionKey,
                                       final Serializer<K> serializer,
                                       final String topic) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index f960b01..dd8a2f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -206,8 +206,15 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema
{
         return new Windowed<>(key, window);
     }
 
-    static Windowed<Bytes> fromStoreBytesKey(final byte[] binaryKey,
-                                             final long windowSize) {
+    public static <K> Windowed<K> fromStoreKey(final Windowed<Bytes> windowedKey,
+                                               final Deserializer<K> deserializer,
+                                               final String topic) {
+        final K key = deserializer.deserialize(topic, windowedKey.key().get());
+        return new Windowed<>(key, windowedKey.window());
+    }
+
+    public static Windowed<Bytes> fromStoreBytesKey(final byte[] binaryKey,
+                                                    final long windowSize) {
         final Bytes key = Bytes.wrap(extractStoreKeyBytes(binaryKey));
         final Window window = extractStoreWindow(binaryKey, windowSize);
         return new Windowed<>(key, window);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 856a85d..6889525 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -519,7 +519,7 @@ public class QueryableStateIntegrationTest {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
 
-        waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
+        waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
             myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.keyValueStore());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 8474565..1c4ba46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyTestDriverWrapper;
@@ -35,6 +36,7 @@ import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.List;
@@ -50,6 +52,12 @@ public class KTableFilterTest {
     private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new
StringSerializer(), new IntegerSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
 
+    @Before
+    public void setUp() {
+        // disable caching at the config level
+        props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
+    }
+
     private final Predicate<String, Integer> predicate = (key, value) -> (value
% 2) == 0;
 
     private void doTestKTable(final StreamsBuilder builder,
@@ -76,31 +84,34 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testKTable() {
+    public void shouldPassThroughWithoutMaterialization() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
 
         final KTable<String, Integer> table1 = builder.table(topic1, consumed);
-
         final KTable<String, Integer> table2 = table1.filter(predicate);
         final KTable<String, Integer> table3 = table1.filterNot(predicate);
 
+        assertNull(table1.queryableStoreName());
+        assertNull(table2.queryableStoreName());
+        assertNull(table3.queryableStoreName());
+
         doTestKTable(builder, table2, table3, topic1);
     }
 
     @Test
-    public void testQueryableKTable() {
+    public void shouldPassThroughOnMaterialization() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
 
         final KTable<String, Integer> table1 = builder.table(topic1, consumed);
-
-        final KTable<String, Integer> table2 = table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
+        final KTable<String, Integer> table2 = table1.filter(predicate, Materialized.as("store2"));
         final KTable<String, Integer> table3 = table1.filterNot(predicate);
 
-        assertEquals("anyStoreNameFilter", table2.queryableStoreName());
+        assertNull(table1.queryableStoreName());
+        assertEquals("store2", table2.queryableStoreName());
         assertNull(table3.queryableStoreName());
 
         doTestKTable(builder, table2, table3, topic1);
@@ -175,7 +186,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableValueGetter() {
+    public void shouldGetValuesOnMaterialization() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
@@ -189,6 +200,7 @@ public class KTableFilterTest {
         final KTableImpl<String, Integer, Integer> table4 =
             (KTableImpl<String, Integer, Integer>) table1.filterNot(predicate);
 
+        assertNull(table1.queryableStoreName());
         assertEquals("store2", table2.queryableStoreName());
         assertEquals("store3", table3.queryableStoreName());
         assertNull(table4.queryableStoreName());
@@ -237,7 +249,7 @@ public class KTableFilterTest {
 
 
     @Test
-    public void testNotSendingOldValue() {
+    public void shouldNotSendOldValuesWithoutMaterialization() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
@@ -250,7 +262,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableNotSendingOldValue() {
+    public void shouldNotSendOldValuesOnMaterialization() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
@@ -258,7 +270,7 @@ public class KTableFilterTest {
         final KTableImpl<String, Integer, Integer> table1 =
             (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         final KTableImpl<String, Integer, Integer> table2 =
-            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
 
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
@@ -306,7 +318,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testSendingOldValue() {
+    public void shouldSendOldValuesWhenEnabledWithoutMaterialization() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
@@ -320,7 +332,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableSendingOldValue() {
+    public void shouldSendOldValuesWhenEnabledOnMaterialization() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final String topic1 = "topic1";
@@ -328,7 +340,7 @@ public class KTableFilterTest {
         final KTableImpl<String, Integer, Integer> table1 =
             (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         final KTableImpl<String, Integer, Integer> table2 =
-            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
+            (KTableImpl<String, Integer, Integer>) table1.filter(predicate, Materialized.as("store2"));
 
         doTestSendingOldValue(builder, table1, table2, topic1);
     }
@@ -357,7 +369,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testSkipNullOnMaterialization() {
+    public void shouldSkipNullToRepartitionWithoutMaterialization() {
         // Do not explicitly set enableSendingOldValues. Let a further downstream stateful
operator trigger it instead.
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -375,7 +387,7 @@ public class KTableFilterTest {
     }
 
     @Test
-    public void testQueryableSkipNullOnMaterialization() {
+    public void shouldSkipNullToRepartitionOnMaterialization() {
         // Do not explicitly set enableSendingOldValues. Let a further downstream stateful
operator trigger it instead.
         final StreamsBuilder builder = new StreamsBuilder();
 
@@ -385,7 +397,7 @@ public class KTableFilterTest {
         final KTableImpl<String, String, String> table1 =
             (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         final KTableImpl<String, String, String> table2 =
-            (KTableImpl<String, String, String>) table1.filter((key, value) -> value.equalsIgnoreCase("accept"),
Materialized.as("anyStoreNameFilter"))
+            (KTableImpl<String, String, String>) table1.filter((key, value) -> value.equalsIgnoreCase("accept"),
Materialized.as("store2"))
                 .groupBy(MockMapper.noOpKeyValueMapper())
                 .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as("mock-result"));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 9d1284e..ad87b60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -41,6 +41,8 @@ import java.util.Map;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
@@ -150,6 +152,8 @@ public abstract class AbstractKeyValueStoreTest {
         assertNull(store.get(3));
         assertEquals("four", store.get(4));
         assertEquals("five", store.get(5));
+        // Flush now so that for caching store, we will not skip the deletion following an
put
+        store.flush();
         store.delete(5);
         assertEquals(4, driver.sizeOf(store));
 
@@ -159,13 +163,13 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals("one", driver.flushedEntryStored(1));
         assertEquals("two", driver.flushedEntryStored(2));
         assertEquals("four", driver.flushedEntryStored(4));
-        assertEquals(null, driver.flushedEntryStored(5));
+        assertNull(driver.flushedEntryStored(5));
 
-        assertEquals(false, driver.flushedEntryRemoved(0));
-        assertEquals(false, driver.flushedEntryRemoved(1));
-        assertEquals(false, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
+        assertFalse(driver.flushedEntryRemoved(0));
+        assertFalse(driver.flushedEntryRemoved(1));
+        assertFalse(driver.flushedEntryRemoved(2));
+        assertFalse(driver.flushedEntryRemoved(4));
+        assertTrue(driver.flushedEntryRemoved(5));
 
         final HashMap<Integer, String> expectedContents = new HashMap<>();
         expectedContents.put(2, "two");
@@ -196,6 +200,7 @@ public abstract class AbstractKeyValueStoreTest {
         assertNull(store.get(3));
         assertEquals("four", store.get(4));
         assertEquals("five", store.get(5));
+        store.flush();
         store.delete(5);
 
         // Flush the store and verify all current entries were properly flushed ...
@@ -204,13 +209,13 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals("one", driver.flushedEntryStored(1));
         assertEquals("two", driver.flushedEntryStored(2));
         assertEquals("four", driver.flushedEntryStored(4));
-        assertEquals(null, driver.flushedEntryStored(5));
+        assertNull(null, driver.flushedEntryStored(5));
 
-        assertEquals(false, driver.flushedEntryRemoved(0));
-        assertEquals(false, driver.flushedEntryRemoved(1));
-        assertEquals(false, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
+        assertFalse(driver.flushedEntryRemoved(0));
+        assertFalse(driver.flushedEntryRemoved(1));
+        assertFalse(driver.flushedEntryRemoved(2));
+        assertFalse(driver.flushedEntryRemoved(4));
+        assertTrue(driver.flushedEntryRemoved(5));
     }
 
     @Test
@@ -278,10 +283,10 @@ public abstract class AbstractKeyValueStoreTest {
         assertEquals("two", driver.flushedEntryStored(2));
         assertEquals("four", driver.flushedEntryStored(4));
 
-        assertEquals(false, driver.flushedEntryRemoved(0));
-        assertEquals(false, driver.flushedEntryRemoved(1));
-        assertEquals(false, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertFalse(driver.flushedEntryRemoved(0));
+        assertFalse(driver.flushedEntryRemoved(1));
+        assertFalse(driver.flushedEntryRemoved(2));
+        assertFalse(driver.flushedEntryRemoved(4));
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 3ac05a0..7b8957e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -38,7 +38,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -104,6 +103,23 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     }
 
     @Test
+    public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() {
+        final int added = addItemsToCache();
+        // all dirty entries should have been flushed
+        assertEquals(added, underlyingStore.approximateNumEntries());
+        assertEquals(added, cacheFlushListener.forwarded.size());
+
+        store.put(bytesKey("key"), bytesValue("value"));
+        assertEquals(added, underlyingStore.approximateNumEntries());
+        assertEquals(added, cacheFlushListener.forwarded.size());
+
+        store.put(bytesKey("key"), null);
+        store.flush();
+        assertEquals(added, underlyingStore.approximateNumEntries());
+        assertEquals(added, cacheFlushListener.forwarded.size());
+    }
+
+    @Test
     public void shouldCloseAfterErrorWithFlush() {
         try {
             cache = EasyMock.niceMock(ThreadCache.class);
@@ -139,7 +155,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     }
 
     @Test
-    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws IOException {
+    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
         final int added = addItemsToCache();
         // all dirty entries should have been flushed
         assertEquals(added, underlyingStore.approximateNumEntries());
@@ -148,7 +164,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     }
 
     @Test
-    public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException {
+    public void shouldForwardDirtyItemToListenerWhenEvicted() {
         final int numRecords = addItemsToCache();
         assertEquals(numRecords, cacheFlushListener.forwarded.size());
     }
@@ -166,24 +182,51 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
         store.setFlushListener(cacheFlushListener, true);
         store.put(bytesKey("1"), bytesValue("a"));
         store.flush();
+        assertEquals("a", cacheFlushListener.forwarded.get("1").newValue);
+        assertNull(cacheFlushListener.forwarded.get("1").oldValue);
         store.put(bytesKey("1"), bytesValue("b"));
+        store.put(bytesKey("1"), bytesValue("c"));
         store.flush();
-        assertEquals("b", cacheFlushListener.forwarded.get("1").newValue);
+        assertEquals("c", cacheFlushListener.forwarded.get("1").newValue);
         assertEquals("a", cacheFlushListener.forwarded.get("1").oldValue);
+        store.put(bytesKey("1"), null);
+        store.flush();
+        assertNull(cacheFlushListener.forwarded.get("1").newValue);
+        assertEquals("c", cacheFlushListener.forwarded.get("1").oldValue);
+        cacheFlushListener.forwarded.clear();
+        store.put(bytesKey("1"), bytesValue("a"));
+        store.put(bytesKey("1"), bytesValue("b"));
+        store.put(bytesKey("1"), null);
+        store.flush();
+        assertNull(cacheFlushListener.forwarded.get("1"));
+        cacheFlushListener.forwarded.clear();
     }
 
     @Test
     public void shouldNotForwardOldValuesWhenDisabled() {
         store.put(bytesKey("1"), bytesValue("a"));
         store.flush();
+        assertEquals("a", cacheFlushListener.forwarded.get("1").newValue);
+        assertNull(cacheFlushListener.forwarded.get("1").oldValue);
         store.put(bytesKey("1"), bytesValue("b"));
         store.flush();
         assertEquals("b", cacheFlushListener.forwarded.get("1").newValue);
         assertNull(cacheFlushListener.forwarded.get("1").oldValue);
+        store.put(bytesKey("1"), null);
+        store.flush();
+        assertNull(cacheFlushListener.forwarded.get("1").newValue);
+        assertNull(cacheFlushListener.forwarded.get("1").oldValue);
+        cacheFlushListener.forwarded.clear();
+        store.put(bytesKey("1"), bytesValue("a"));
+        store.put(bytesKey("1"), bytesValue("b"));
+        store.put(bytesKey("1"), null);
+        store.flush();
+        assertNull(cacheFlushListener.forwarded.get("1"));
+        cacheFlushListener.forwarded.clear();
     }
 
     @Test
-    public void shouldIterateAllStoredItems() throws IOException {
+    public void shouldIterateAllStoredItems() {
         final int items = addItemsToCache();
         final KeyValueIterator<Bytes, byte[]> all = store.all();
         final List<Bytes> results = new ArrayList<>();
@@ -194,7 +237,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
     }
 
     @Test
-    public void shouldIterateOverRange() throws IOException {
+    public void shouldIterateOverRange() {
         final int items = addItemsToCache();
         final KeyValueIterator<Bytes, byte[]> range = store.range(bytesKey(String.valueOf(0)),
bytesKey(String.valueOf(items)));
         final List<Bytes> results = new ArrayList<>();
@@ -324,7 +367,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest
{
         store.delete(bytesKey("key"));
     }
 
-    private int addItemsToCache() throws IOException {
+    private int addItemsToCache() {
         int cachedSize = 0;
         int i = 0;
         while (cachedSize < maxCacheSizeBytes) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 36426d4..a6f0a71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -44,12 +44,15 @@ import java.util.Random;
 import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 
 @SuppressWarnings("PointlessArithmeticExpression")
 public class CachingSessionStoreTest {
@@ -57,17 +60,17 @@ public class CachingSessionStoreTest {
     private static final int MAX_CACHE_SIZE_BYTES = 600;
     private static final Long DEFAULT_TIMESTAMP = 10L;
     private static final long SEGMENT_INTERVAL = 100L;
-    private RocksDBSegmentedBytesStore underlying;
-    private CachingSessionStore<String, String> cachingStore;
-    private ThreadCache cache;
     private final Bytes keyA = Bytes.wrap("a".getBytes());
     private final Bytes keyAA = Bytes.wrap("aa".getBytes());
     private final Bytes keyB = Bytes.wrap("b".getBytes());
 
+    private CachingSessionStore<String, String> cachingStore;
+    private ThreadCache cache;
+
     @Before
     public void setUp() {
         final SessionKeySchema schema = new SessionKeySchema();
-        underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL,
schema);
+        final RocksDBSegmentedBytesStore underlying = new RocksDBSegmentedBytesStore("test",
"metrics-scope", 0L, SEGMENT_INTERVAL, schema);
         final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying,
Serdes.Bytes(), Serdes.ByteArray());
         cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(),
SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new
Metrics()));
@@ -174,12 +177,15 @@ public class CachingSessionStoreTest {
         final Windowed<Bytes> b = new Windowed<>(keyB, new SessionWindow(0, 0));
         cachingStore.put(a, "2".getBytes());
         cachingStore.put(b, "2".getBytes());
-        cachingStore.flush();
         cachingStore.remove(a);
-        cachingStore.flush();
+
         final KeyValueIterator<Windowed<Bytes>, byte[]> rangeIter =
             cachingStore.findSessions(keyA, 0, 0);
         assertFalse(rangeIter.hasNext());
+
+        assertNull(cachingStore.fetchSession(keyA, 0, 0));
+        assertThat(cachingStore.fetchSession(keyB, 0, 0), equalTo("2".getBytes()));
+
     }
 
     @Test
@@ -211,7 +217,6 @@ public class CachingSessionStoreTest {
         cachingStore.put(a2, "2".getBytes());
         cachingStore.put(a3, "3".getBytes());
         cachingStore.put(aa3, "3".getBytes());
-        cachingStore.flush();
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults =
             cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
@@ -270,10 +275,21 @@ public class CachingSessionStoreTest {
             flushed
         );
         flushed.clear();
+
+        cachingStore.put(a, "1".getBytes());
+        cachingStore.put(a, "2".getBytes());
+        cachingStore.remove(a);
+        cachingStore.flush();
+
+        assertEquals(
+            Collections.emptyList(),
+            flushed
+        );
+        flushed.clear();
     }
 
     @Test
-    public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull()
{
+    public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
         final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
         final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0,
0));
         final List<KeyValue<Windowed<String>, Change<String>>> flushed
= new ArrayList<>();
@@ -292,38 +308,25 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
         assertEquals(
-            flushed,
             Arrays.asList(
                 KeyValue.pair(aDeserialized, new Change<>("1", null)),
                 KeyValue.pair(aDeserialized, new Change<>("2", null)),
-                KeyValue.pair(aDeserialized, new Change<>(null, "2"))
-            )
-        );
-    }
-
-    @Test
-    public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
-        final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
-        final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0,
0));
-        final List<KeyValue<Windowed<String>, Change<String>>> flushed
= new ArrayList<>();
-        cachingStore.setFlushListener(
-            (key, newValue, oldValue, timestamp) -> flushed.add(KeyValue.pair(key, new
Change<>(newValue, oldValue))),
-            false
+                KeyValue.pair(aDeserialized, new Change<>(null, null))
+            ),
+            flushed
         );
+        flushed.clear();
 
         cachingStore.put(a, "1".getBytes());
-        cachingStore.flush();
-
         cachingStore.put(a, "2".getBytes());
+        cachingStore.remove(a);
         cachingStore.flush();
 
         assertEquals(
-            flushed,
-            Arrays.asList(
-                KeyValue.pair(aDeserialized, new Change<>("1", null)),
-                KeyValue.pair(aDeserialized, new Change<>("2", null))
-            )
+            Collections.emptyList(),
+            flushed
         );
+        flushed.clear();
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 610fc60..cb3fcd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -345,11 +345,26 @@ public class CachingWindowStoreTest {
         final Windowed<String> windowedKey =
             new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP
+ WINDOW_SIZE));
         cachingStore.put(bytesKey("1"), bytesValue("a"));
-        cachingStore.flush();
         cachingStore.put(bytesKey("1"), bytesValue("b"));
         cachingStore.flush();
         assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
-        assertEquals("a", cacheListener.forwarded.get(windowedKey).oldValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cacheListener.forwarded.clear();
+        cachingStore.put(bytesKey("1"), bytesValue("c"));
+        cachingStore.flush();
+        assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
+        assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue);
+        cachingStore.put(bytesKey("1"), null);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey).newValue);
+        assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue);
+        cacheListener.forwarded.clear();
+        cachingStore.put(bytesKey("1"), bytesValue("a"));
+        cachingStore.put(bytesKey("1"), bytesValue("b"));
+        cachingStore.put(bytesKey("1"), null);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey));
+        cacheListener.forwarded.clear();
     }
 
     @Test
@@ -357,11 +372,25 @@ public class CachingWindowStoreTest {
         final Windowed<String> windowedKey =
             new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP
+ WINDOW_SIZE));
         cachingStore.put(bytesKey("1"), bytesValue("a"));
-        cachingStore.flush();
         cachingStore.put(bytesKey("1"), bytesValue("b"));
         cachingStore.flush();
         assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
         assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cachingStore.put(bytesKey("1"), bytesValue("c"));
+        cachingStore.flush();
+        assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cachingStore.put(bytesKey("1"), null);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cacheListener.forwarded.clear();
+        cachingStore.put(bytesKey("1"), bytesValue("a"));
+        cachingStore.put(bytesKey("1"), bytesValue("b"));
+        cachingStore.put(bytesKey("1"), null);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey));
+        cacheListener.forwarded.clear();
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 802df8d..3653e7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -32,11 +32,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.apache.kafka.test.StreamsTestUtils.valuesToList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -272,20 +273,5 @@ public class RocksDBSessionStoreTest {
         sessionStore.put(null, 1L);
     }
     
-    static <K, V> List<KeyValue<Windowed<K>, V>> toList(final KeyValueIterator<Windowed<K>,
V> iterator) {
-        final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>();
-        while (iterator.hasNext()) {
-            results.add(iterator.next());
-        }
-        return results;
-    }
-
-    private static <K, V> List<V> valuesToList(final KeyValueIterator<Windowed<K>,
V> iterator) {
-        final List<V> results = new ArrayList<>();
-        while (iterator.hasNext()) {
-            results.add(iterator.next().value);
-        }
-        return results;
-    }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 1dcebb5..87a693d 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -88,6 +88,15 @@ public final class StreamsTestUtils {
         return results;
     }
 
+    public static <K, V> List<V> valuesToList(final Iterator<KeyValue<K,
V>> iterator) {
+        final List<V> results = new ArrayList<>();
+
+        while (iterator.hasNext()) {
+            results.add(iterator.next().value);
+        }
+        return results;
+    }
+
     public static <K> void verifyKeyValueList(final List<KeyValue<K, byte[]>>
expected, final List<KeyValue<K, byte[]>> actual) {
         assertThat(actual.size(), equalTo(expected.size()));
         for (int i = 0; i < actual.size(); i++) {


Mime
View raw message