From commits-return-11187-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Wed Feb 13 22:36:54 2019 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBEAB1831B for ; Wed, 13 Feb 2019 22:36:54 +0000 (UTC) Received: (qmail 91001 invoked by uid 500); 13 Feb 2019 06:36:53 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 90971 invoked by uid 500); 13 Feb 2019 06:36:53 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 90962 invoked by uid 99); 13 Feb 2019 06:36:53 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Feb 2019 06:36:53 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5387782C7C; Wed, 13 Feb 2019 06:36:52 +0000 (UTC) Date: Wed, 13 Feb 2019 06:36:51 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-7652: Part III; Put to underlying before Flush (#6191) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155003981093.15169.14233665634828046028@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 826b6d9424d0748d4eac28dd8475a31d0529b63a X-Git-Newrev: 0a1c26934757afae4dce49ff3ee038311ca6dd4a X-Git-Rev: 0a1c26934757afae4dce49ff3ee038311ca6dd4a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0a1c269 KAFKA-7652: Part III; Put to underlying before Flush (#6191) 0a1c269 is described below commit 0a1c26934757afae4dce49ff3ee038311ca6dd4a Author: Guozhang Wang AuthorDate: Tue Feb 12 22:36:40 2019 -0800 KAFKA-7652: Part III; Put to underlying before Flush (#6191) 1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see #4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well. 2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush li [...] Modifies corresponding unit tests. Reviewers: John Roesler , Matthias J. Sax , Bill Bejeck --- .../state/internals/CachingKeyValueStore.java | 44 ++++++++------ .../state/internals/CachingSessionStore.java | 37 +++++++----- .../state/internals/CachingWindowStore.java | 68 +++++++++++----------- .../streams/state/internals/SessionKeySchema.java | 7 +++ .../streams/state/internals/WindowKeySchema.java | 11 +++- .../integration/QueryableStateIntegrationTest.java | 2 +- .../kstream/internals/KTableFilterTest.java | 44 +++++++++----- .../state/internals/AbstractKeyValueStoreTest.java | 37 +++++++----- .../state/internals/CachingKeyValueStoreTest.java | 57 +++++++++++++++--- .../state/internals/CachingSessionStoreTest.java | 63 ++++++++++---------- .../state/internals/CachingWindowStoreTest.java | 35 ++++++++++- .../state/internals/RocksDBSessionStoreTest.java | 18 +----- .../org/apache/kafka/test/StreamsTestUtils.java | 9 +++ 13 files changed, 274 insertions(+), 158 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 9ecae5a..992466c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -85,27 +85,33 @@ class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore im private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { - final ProcessorRecordContext current = context.recordContext(); - try { - context.setRecordContext(entry.entry().context()); - if (flushListener != null) { - V oldValue = null; - if (sendOldValues) { - final byte[] oldBytesValue = underlying.get(entry.key()); - oldValue = oldBytesValue == null ? null : serdes.valueFrom(oldBytesValue); - } - // we rely on underlying store to handle null new value bytes as deletes - underlying.put(entry.key(), entry.newValue()); - flushListener.apply( - serdes.keyFrom(entry.key().get()), - serdes.valueFrom(entry.newValue()), - oldValue, - entry.entry().context().timestamp()); - } else { + if (flushListener != null) { + final byte[] newValueBytes = entry.newValue(); + final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? underlying.get(entry.key()) : null; + + // this is an optimization: if this key did not exist in underlying store and also not in the cache, + // we can skip flushing to downstream as well as writing to underlying store + if (newValueBytes != null || oldValueBytes != null) { + final K key = serdes.keyFrom(entry.key().get()); + final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; + final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; + // we need to get the old values if needed, and then put to store, and then flush underlying.put(entry.key(), entry.newValue()); + + final ProcessorRecordContext current = context.recordContext(); + context.setRecordContext(entry.entry().context()); + try { + flushListener.apply( + key, + newValue, + oldValue, + entry.entry().context().timestamp()); + } finally { + context.setRecordContext(current); + } } - } finally { - context.setRecordContext(current); + } else { + underlying.put(entry.key(), entry.newValue()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index a07f1ce..1c5c2f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -72,7 +72,6 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i keySerde == null ? (Serde) context.keySerde() : keySerde, aggSerde == null ? (Serde) context.valueSerde() : aggSerde); - cacheName = context.taskId() + "-" + bytesStore.name(); cache = context.getCache(); cache.addDirtyEntryFlushListener(cacheName, entries -> { @@ -178,27 +177,35 @@ class CachingSessionStore extends WrappedStateStore.AbstractStateStore i private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { final Bytes binaryKey = cacheFunction.key(entry.key()); - final ProcessorRecordContext current = context.recordContext(); - context.setRecordContext(entry.entry().context()); - try { - final Windowed key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(), topic); - final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key())); - if (flushListener != null) { - final AGG newValue = serdes.valueFrom(entry.newValue()); - final AGG oldValue = newValue == null || sendOldValues ? - serdes.valueFrom(bytesStore.fetchSession(rawKey, key.window().start(), key.window().end())) : - null; - if (!(newValue == null && oldValue == null)) { + final Windowed bytesKey = SessionKeySchema.from(binaryKey); + if (flushListener != null) { + final byte[] newValueBytes = entry.newValue(); + final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? + bytesStore.fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end()) : null; + + // this is an optimization: if this key did not exist in underlying store and also not in the cache, + // we can skip flushing to downstream as well as writing to underlying store + if (newValueBytes != null || oldValueBytes != null) { + final Windowed key = SessionKeySchema.from(bytesKey, serdes.keyDeserializer(), topic); + final AGG newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; + final AGG oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; + // we need to get the old values if needed, and then put to store, and then flush + bytesStore.put(bytesKey, entry.newValue()); + + final ProcessorRecordContext current = context.recordContext(); + context.setRecordContext(entry.entry().context()); + try { flushListener.apply( key, newValue, oldValue, entry.entry().context().timestamp()); + } finally { + context.setRecordContext(current); } } - bytesStore.put(new Windowed<>(rawKey, key.window()), entry.newValue()); - } finally { - context.setRecordContext(current); + } else { + bytesStore.put(bytesKey, entry.newValue()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 6112544..53d02dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -64,13 +64,13 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl @Override public void init(final ProcessorContext context, final StateStore root) { - initInternal(context); + initInternal((InternalProcessorContext) context); underlying.init(context, root); } @SuppressWarnings("unchecked") - private void initInternal(final ProcessorContext context) { - this.context = (InternalProcessorContext) context; + private void initInternal(final InternalProcessorContext context) { + this.context = context; final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()); serdes = new StateSerdes<>(topic, keySerde == null ? (Serde) context.keySerde() : keySerde, @@ -84,34 +84,44 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl cache.addDirtyEntryFlushListener(name, entries -> { for (final ThreadCache.DirtyEntry entry : entries) { - final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get(); - final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey); - - final Windowed windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, serdes.keyDeserializer(), serdes.topic()); - final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey)); - maybeForward(entry, key, windowedKey, (InternalProcessorContext) context); - underlying.put(key, entry.newValue(), timestamp); + putAndMaybeForward(entry, context); } }); } - private void maybeForward(final ThreadCache.DirtyEntry entry, - final Bytes key, - final Windowed windowedKey, - final InternalProcessorContext context) { + private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, + final InternalProcessorContext context) { + final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get(); + final Windowed windowedKeyBytes = WindowKeySchema.fromStoreBytesKey(binaryWindowKey, windowSize); + final long windowStartTimestamp = windowedKeyBytes.window().start(); + final Bytes key = windowedKeyBytes.key(); if (flushListener != null) { - final ProcessorRecordContext current = context.recordContext(); - context.setRecordContext(entry.entry().context()); - try { - final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null; - flushListener.apply( - windowedKey, - serdes.valueFrom(entry.newValue()), - oldValue, - entry.entry().context().timestamp()); - } finally { - context.setRecordContext(current); + final byte[] newValueBytes = entry.newValue(); + final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? underlying.fetch(key, windowStartTimestamp) : null; + + // this is an optimization: if this key did not exist in underlying store and also not in the cache, + // we can skip flushing to downstream as well as writing to underlying store + if (newValueBytes != null || oldValueBytes != null) { + final Windowed windowedKey = WindowKeySchema.fromStoreKey(windowedKeyBytes, serdes.keyDeserializer(), serdes.topic()); + final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; + final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; + // we need to get the old values if needed, and then put to store, and then flush + underlying.put(key, entry.newValue(), windowStartTimestamp); + + final ProcessorRecordContext current = context.recordContext(); + context.setRecordContext(entry.entry().context()); + try { + flushListener.apply( + windowedKey, + newValue, + oldValue, + entry.entry().context().timestamp()); + } finally { + context.setRecordContext(current); + } } + } else { + underlying.put(key, entry.newValue(), windowStartTimestamp); } } @@ -231,14 +241,6 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl ); } - private V fetchPrevious(final Bytes key, final long timestamp) { - final byte[] value = underlying.fetch(key, timestamp); - if (value != null) { - return serdes.valueFrom(value); - } - return null; - } - @Override public KeyValueIterator, byte[]> all() { validateStoreOpen(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 94f08ae..0c80da4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -132,6 +132,13 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { return new Windowed<>(Bytes.wrap(extractKeyBytes(binaryKey)), window); } + public static Windowed from(final Windowed keyBytes, + final Deserializer keyDeserializer, + final String topic) { + final K key = keyDeserializer.deserialize(topic, keyBytes.key().get()); + return new Windowed<>(key, keyBytes.window()); + } + public static byte[] toBinary(final Windowed sessionKey, final Serializer serializer, final String topic) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index f960b01..dd8a2f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -206,8 +206,15 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { return new Windowed<>(key, window); } - static Windowed fromStoreBytesKey(final byte[] binaryKey, - final long windowSize) { + public static Windowed fromStoreKey(final Windowed windowedKey, + final Deserializer deserializer, + final String topic) { + final K key = deserializer.deserialize(topic, windowedKey.key().get()); + return new Windowed<>(key, windowedKey.window()); + } + + public static Windowed fromStoreBytesKey(final byte[] binaryKey, + final long windowSize) { final Bytes key = Bytes.wrap(extractStoreKeyBytes(binaryKey)); final Window window = extractStoreWindow(binaryKey, windowSize); return new Windowed<>(key, window); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 856a85d..6889525 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -519,7 +519,7 @@ public class QueryableStateIntegrationTest { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.start(); - waitUntilAtLeastNumRecordProcessed(outputTopic, 2); + waitUntilAtLeastNumRecordProcessed(outputTopic, 1); final ReadOnlyKeyValueStore myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.keyValueStore()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 8474565..1c4ba46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyTestDriverWrapper; @@ -35,6 +36,7 @@ import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; import org.junit.Test; import java.util.List; @@ -50,6 +52,12 @@ public class KTableFilterTest { private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer()); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer()); + @Before + public void setUp() { + // disable caching at the config level + props.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); + } + private final Predicate predicate = (key, value) -> (value % 2) == 0; private void doTestKTable(final StreamsBuilder builder, @@ -76,31 +84,34 @@ public class KTableFilterTest { } @Test - public void testKTable() { + public void shouldPassThroughWithoutMaterialization() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; final KTable table1 = builder.table(topic1, consumed); - final KTable table2 = table1.filter(predicate); final KTable table3 = table1.filterNot(predicate); + assertNull(table1.queryableStoreName()); + assertNull(table2.queryableStoreName()); + assertNull(table3.queryableStoreName()); + doTestKTable(builder, table2, table3, topic1); } @Test - public void testQueryableKTable() { + public void shouldPassThroughOnMaterialization() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; final KTable table1 = builder.table(topic1, consumed); - - final KTable table2 = table1.filter(predicate, Materialized.as("anyStoreNameFilter")); + final KTable table2 = table1.filter(predicate, Materialized.as("store2")); final KTable table3 = table1.filterNot(predicate); - assertEquals("anyStoreNameFilter", table2.queryableStoreName()); + assertNull(table1.queryableStoreName()); + assertEquals("store2", table2.queryableStoreName()); assertNull(table3.queryableStoreName()); doTestKTable(builder, table2, table3, topic1); @@ -175,7 +186,7 @@ public class KTableFilterTest { } @Test - public void testQueryableValueGetter() { + public void shouldGetValuesOnMaterialization() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; @@ -189,6 +200,7 @@ public class KTableFilterTest { final KTableImpl table4 = (KTableImpl) table1.filterNot(predicate); + assertNull(table1.queryableStoreName()); assertEquals("store2", table2.queryableStoreName()); assertEquals("store3", table3.queryableStoreName()); assertNull(table4.queryableStoreName()); @@ -237,7 +249,7 @@ public class KTableFilterTest { @Test - public void testNotSendingOldValue() { + public void shouldNotSendOldValuesWithoutMaterialization() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; @@ -250,7 +262,7 @@ public class KTableFilterTest { } @Test - public void testQueryableNotSendingOldValue() { + public void shouldNotSendOldValuesOnMaterialization() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; @@ -258,7 +270,7 @@ public class KTableFilterTest { final KTableImpl table1 = (KTableImpl) builder.table(topic1, consumed); final KTableImpl table2 = - (KTableImpl) table1.filter(predicate, Materialized.as("anyStoreNameFilter")); + (KTableImpl) table1.filter(predicate, Materialized.as("store2")); doTestNotSendingOldValue(builder, table1, table2, topic1); } @@ -306,7 +318,7 @@ public class KTableFilterTest { } @Test - public void testSendingOldValue() { + public void shouldSendOldValuesWhenEnabledWithoutMaterialization() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; @@ -320,7 +332,7 @@ public class KTableFilterTest { } @Test - public void testQueryableSendingOldValue() { + public void shouldSendOldValuesWhenEnabledOnMaterialization() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; @@ -328,7 +340,7 @@ public class KTableFilterTest { final KTableImpl table1 = (KTableImpl) builder.table(topic1, consumed); final KTableImpl table2 = - (KTableImpl) table1.filter(predicate, Materialized.as("anyStoreNameFilter")); + (KTableImpl) table1.filter(predicate, Materialized.as("store2")); doTestSendingOldValue(builder, table1, table2, topic1); } @@ -357,7 +369,7 @@ public class KTableFilterTest { } @Test - public void testSkipNullOnMaterialization() { + public void shouldSkipNullToRepartitionWithoutMaterialization() { // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead. final StreamsBuilder builder = new StreamsBuilder(); @@ -375,7 +387,7 @@ public class KTableFilterTest { } @Test - public void testQueryableSkipNullOnMaterialization() { + public void shouldSkipNullToRepartitionOnMaterialization() { // Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead. final StreamsBuilder builder = new StreamsBuilder(); @@ -385,7 +397,7 @@ public class KTableFilterTest { final KTableImpl table1 = (KTableImpl) builder.table(topic1, consumed); final KTableImpl table2 = - (KTableImpl) table1.filter((key, value) -> value.equalsIgnoreCase("accept"), Materialized.as("anyStoreNameFilter")) + (KTableImpl) table1.filter((key, value) -> value.equalsIgnoreCase("accept"), Materialized.as("store2")) .groupBy(MockMapper.noOpKeyValueMapper()) .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as("mock-result")); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 9d1284e..ad87b60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -41,6 +41,8 @@ import java.util.Map; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; @@ -150,6 +152,8 @@ public abstract class AbstractKeyValueStoreTest { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); + // Flush now so that for caching store, we will not skip the deletion following an put + store.flush(); store.delete(5); assertEquals(4, driver.sizeOf(store)); @@ -159,13 +163,13 @@ public abstract class AbstractKeyValueStoreTest { assertEquals("one", driver.flushedEntryStored(1)); assertEquals("two", driver.flushedEntryStored(2)); assertEquals("four", driver.flushedEntryStored(4)); - assertEquals(null, driver.flushedEntryStored(5)); + assertNull(driver.flushedEntryStored(5)); - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); + assertFalse(driver.flushedEntryRemoved(0)); + assertFalse(driver.flushedEntryRemoved(1)); + assertFalse(driver.flushedEntryRemoved(2)); + assertFalse(driver.flushedEntryRemoved(4)); + assertTrue(driver.flushedEntryRemoved(5)); final HashMap expectedContents = new HashMap<>(); expectedContents.put(2, "two"); @@ -196,6 +200,7 @@ public abstract class AbstractKeyValueStoreTest { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); + store.flush(); store.delete(5); // Flush the store and verify all current entries were properly flushed ... @@ -204,13 +209,13 @@ public abstract class AbstractKeyValueStoreTest { assertEquals("one", driver.flushedEntryStored(1)); assertEquals("two", driver.flushedEntryStored(2)); assertEquals("four", driver.flushedEntryStored(4)); - assertEquals(null, driver.flushedEntryStored(5)); + assertNull(null, driver.flushedEntryStored(5)); - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); + assertFalse(driver.flushedEntryRemoved(0)); + assertFalse(driver.flushedEntryRemoved(1)); + assertFalse(driver.flushedEntryRemoved(2)); + assertFalse(driver.flushedEntryRemoved(4)); + assertTrue(driver.flushedEntryRemoved(5)); } @Test @@ -278,10 +283,10 @@ public abstract class AbstractKeyValueStoreTest { assertEquals("two", driver.flushedEntryStored(2)); assertEquals("four", driver.flushedEntryStored(4)); - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); + assertFalse(driver.flushedEntryRemoved(0)); + assertFalse(driver.flushedEntryRemoved(1)); + assertFalse(driver.flushedEntryRemoved(2)); + assertFalse(driver.flushedEntryRemoved(4)); } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 3ac05a0..7b8957e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -38,7 +38,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -104,6 +103,23 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { } @Test + public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() { + final int added = addItemsToCache(); + // all dirty entries should have been flushed + assertEquals(added, underlyingStore.approximateNumEntries()); + assertEquals(added, cacheFlushListener.forwarded.size()); + + store.put(bytesKey("key"), bytesValue("value")); + assertEquals(added, underlyingStore.approximateNumEntries()); + assertEquals(added, cacheFlushListener.forwarded.size()); + + store.put(bytesKey("key"), null); + store.flush(); + assertEquals(added, underlyingStore.approximateNumEntries()); + assertEquals(added, cacheFlushListener.forwarded.size()); + } + + @Test public void shouldCloseAfterErrorWithFlush() { try { cache = EasyMock.niceMock(ThreadCache.class); @@ -139,7 +155,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { } @Test - public void shouldFlushEvictedItemsIntoUnderlyingStore() throws IOException { + public void shouldFlushEvictedItemsIntoUnderlyingStore() { final int added = addItemsToCache(); // all dirty entries should have been flushed assertEquals(added, underlyingStore.approximateNumEntries()); @@ -148,7 +164,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { } @Test - public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException { + public void shouldForwardDirtyItemToListenerWhenEvicted() { final int numRecords = addItemsToCache(); assertEquals(numRecords, cacheFlushListener.forwarded.size()); } @@ -166,24 +182,51 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { store.setFlushListener(cacheFlushListener, true); store.put(bytesKey("1"), bytesValue("a")); store.flush(); + assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); + assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), bytesValue("b")); + store.put(bytesKey("1"), bytesValue("c")); store.flush(); - assertEquals("b", cacheFlushListener.forwarded.get("1").newValue); + assertEquals("c", cacheFlushListener.forwarded.get("1").newValue); assertEquals("a", cacheFlushListener.forwarded.get("1").oldValue); + store.put(bytesKey("1"), null); + store.flush(); + assertNull(cacheFlushListener.forwarded.get("1").newValue); + assertEquals("c", cacheFlushListener.forwarded.get("1").oldValue); + cacheFlushListener.forwarded.clear(); + store.put(bytesKey("1"), bytesValue("a")); + store.put(bytesKey("1"), bytesValue("b")); + store.put(bytesKey("1"), null); + store.flush(); + assertNull(cacheFlushListener.forwarded.get("1")); + cacheFlushListener.forwarded.clear(); } @Test public void shouldNotForwardOldValuesWhenDisabled() { store.put(bytesKey("1"), bytesValue("a")); store.flush(); + assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); + assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), bytesValue("b")); store.flush(); assertEquals("b", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); + store.put(bytesKey("1"), null); + store.flush(); + assertNull(cacheFlushListener.forwarded.get("1").newValue); + assertNull(cacheFlushListener.forwarded.get("1").oldValue); + cacheFlushListener.forwarded.clear(); + store.put(bytesKey("1"), bytesValue("a")); + store.put(bytesKey("1"), bytesValue("b")); + store.put(bytesKey("1"), null); + store.flush(); + assertNull(cacheFlushListener.forwarded.get("1")); + cacheFlushListener.forwarded.clear(); } @Test - public void shouldIterateAllStoredItems() throws IOException { + public void shouldIterateAllStoredItems() { final int items = addItemsToCache(); final KeyValueIterator all = store.all(); final List results = new ArrayList<>(); @@ -194,7 +237,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { } @Test - public void shouldIterateOverRange() throws IOException { + public void shouldIterateOverRange() { final int items = addItemsToCache(); final KeyValueIterator range = store.range(bytesKey(String.valueOf(0)), bytesKey(String.valueOf(items))); final List results = new ArrayList<>(); @@ -324,7 +367,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { store.delete(bytesKey("key")); } - private int addItemsToCache() throws IOException { + private int addItemsToCache() { int cachedSize = 0; int i = 0; while (cachedSize < maxCacheSizeBytes) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 36426d4..a6f0a71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -44,12 +44,15 @@ import java.util.Random; import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList; +import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; @SuppressWarnings("PointlessArithmeticExpression") public class CachingSessionStoreTest { @@ -57,17 +60,17 @@ public class CachingSessionStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 600; private static final Long DEFAULT_TIMESTAMP = 10L; private static final long SEGMENT_INTERVAL = 100L; - private RocksDBSegmentedBytesStore underlying; - private CachingSessionStore cachingStore; - private ThreadCache cache; private final Bytes keyA = Bytes.wrap("a".getBytes()); private final Bytes keyAA = Bytes.wrap("aa".getBytes()); private final Bytes keyB = Bytes.wrap("b".getBytes()); + private CachingSessionStore cachingStore; + private ThreadCache cache; + @Before public void setUp() { final SessionKeySchema schema = new SessionKeySchema(); - underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema); + final RocksDBSegmentedBytesStore underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema); final RocksDBSessionStore sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); @@ -174,12 +177,15 @@ public class CachingSessionStoreTest { final Windowed b = new Windowed<>(keyB, new SessionWindow(0, 0)); cachingStore.put(a, "2".getBytes()); cachingStore.put(b, "2".getBytes()); - cachingStore.flush(); cachingStore.remove(a); - cachingStore.flush(); + final KeyValueIterator, byte[]> rangeIter = cachingStore.findSessions(keyA, 0, 0); assertFalse(rangeIter.hasNext()); + + assertNull(cachingStore.fetchSession(keyA, 0, 0)); + assertThat(cachingStore.fetchSession(keyB, 0, 0), equalTo("2".getBytes())); + } @Test @@ -211,7 +217,6 @@ public class CachingSessionStoreTest { cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); cachingStore.put(aa3, "3".getBytes()); - cachingStore.flush(); final KeyValueIterator, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); @@ -270,10 +275,21 @@ public class CachingSessionStoreTest { flushed ); flushed.clear(); + + cachingStore.put(a, "1".getBytes()); + cachingStore.put(a, "2".getBytes()); + cachingStore.remove(a); + cachingStore.flush(); + + assertEquals( + Collections.emptyList(), + flushed + ); + flushed.clear(); } @Test - public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull() { + public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() { final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); final List, Change>> flushed = new ArrayList<>(); @@ -292,38 +308,25 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - flushed, Arrays.asList( KeyValue.pair(aDeserialized, new Change<>("1", null)), KeyValue.pair(aDeserialized, new Change<>("2", null)), - KeyValue.pair(aDeserialized, new Change<>(null, "2")) - ) - ); - } - - @Test - public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() { - final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); - final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); - final List, Change>> flushed = new ArrayList<>(); - cachingStore.setFlushListener( - (key, newValue, oldValue, timestamp) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), - false + KeyValue.pair(aDeserialized, new Change<>(null, null)) + ), + flushed ); + flushed.clear(); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); - cachingStore.put(a, "2".getBytes()); + cachingStore.remove(a); cachingStore.flush(); assertEquals( - flushed, - Arrays.asList( - KeyValue.pair(aDeserialized, new Change<>("1", null)), - KeyValue.pair(aDeserialized, new Change<>("2", null)) - ) + Collections.emptyList(), + flushed ); + flushed.clear(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 610fc60..cb3fcd4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -345,11 +345,26 @@ public class CachingWindowStoreTest { final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a")); - cachingStore.flush(); cachingStore.put(bytesKey("1"), bytesValue("b")); cachingStore.flush(); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); - assertEquals("a", cacheListener.forwarded.get(windowedKey).oldValue); + assertNull(cacheListener.forwarded.get(windowedKey).oldValue); + cacheListener.forwarded.clear(); + cachingStore.put(bytesKey("1"), bytesValue("c")); + cachingStore.flush(); + assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); + assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); + cachingStore.put(bytesKey("1"), null); + cachingStore.flush(); + assertNull(cacheListener.forwarded.get(windowedKey).newValue); + assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); + cacheListener.forwarded.clear(); + cachingStore.put(bytesKey("1"), bytesValue("a")); + cachingStore.put(bytesKey("1"), bytesValue("b")); + cachingStore.put(bytesKey("1"), null); + cachingStore.flush(); + assertNull(cacheListener.forwarded.get(windowedKey)); + cacheListener.forwarded.clear(); } @Test @@ -357,11 +372,25 @@ public class CachingWindowStoreTest { final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a")); - cachingStore.flush(); cachingStore.put(bytesKey("1"), bytesValue("b")); cachingStore.flush(); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); + cachingStore.put(bytesKey("1"), bytesValue("c")); + cachingStore.flush(); + assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); + assertNull(cacheListener.forwarded.get(windowedKey).oldValue); + cachingStore.put(bytesKey("1"), null); + cachingStore.flush(); + assertNull(cacheListener.forwarded.get(windowedKey).newValue); + assertNull(cacheListener.forwarded.get(windowedKey).oldValue); + cacheListener.forwarded.clear(); + cachingStore.put(bytesKey("1"), bytesValue("a")); + cachingStore.put(bytesKey("1"), bytesValue("b")); + cachingStore.put(bytesKey("1"), null); + cachingStore.flush(); + assertNull(cacheListener.forwarded.get(windowedKey)); + cacheListener.forwarded.clear(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 802df8d..3653e7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -32,11 +32,12 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.apache.kafka.test.StreamsTestUtils.valuesToList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -272,20 +273,5 @@ public class RocksDBSessionStoreTest { sessionStore.put(null, 1L); } - static List, V>> toList(final KeyValueIterator, V> iterator) { - final List, V>> results = new ArrayList<>(); - while (iterator.hasNext()) { - results.add(iterator.next()); - } - return results; - } - - private static List valuesToList(final KeyValueIterator, V> iterator) { - final List results = new ArrayList<>(); - while (iterator.hasNext()) { - results.add(iterator.next().value); - } - return results; - } } diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 1dcebb5..87a693d 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -88,6 +88,15 @@ public final class StreamsTestUtils { return results; } + public static List valuesToList(final Iterator> iterator) { + final List results = new ArrayList<>(); + + while (iterator.hasNext()) { + results.add(iterator.next().value); + } + return results; + } + public static void verifyKeyValueList(final List> expected, final List> actual) { assertThat(actual.size(), equalTo(expected.size())); for (int i = 0; i < actual.size(); i++) {