This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 272e766 KAFKA-10030: Allow fetching a key from a single partition (#8706) 272e766 is described below commit 272e766212d9e3a2ee2513b922c609334060f451 Author: Dima Reznik AuthorDate: Tue Jun 2 04:06:28 2020 +0300 KAFKA-10030: Allow fetching a key from a single partition (#8706) Reviewers: Navinder Pal Singh Brar , Boyang Chen , Matthias J. Sax --- .../state/internals/QueryableStoreProvider.java | 14 ++++- .../internals/StreamThreadStateStoreProvider.java | 5 +- .../integration/StoreQueryIntegrationTest.java | 69 ++++++++++++++++++++++ .../internals/QueryableStoreProviderTest.java | 42 ++++++++++++- .../StreamThreadStateStoreProviderTest.java | 14 ++--- .../apache/kafka/test/StateStoreProviderStub.java | 29 +++++++-- 6 files changed, 151 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 8917164..2af5874 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -58,9 +58,21 @@ public class QueryableStoreProvider { } final List allStores = new ArrayList<>(); for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { - allStores.addAll(storeProvider.stores(storeQueryParameters)); + final List stores = storeProvider.stores(storeQueryParameters); + if (!stores.isEmpty()) { + allStores.addAll(stores); + if (storeQueryParameters.partition() != null) { + break; + } + } } if (allStores.isEmpty()) { + if (storeQueryParameters.partition() != null) { + throw new InvalidStateStoreException( + String.format("The specified partition %d for store %s does not exist.", + storeQueryParameters.partition(), + storeName)); + } throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); } return queryableStoreType.create( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 57d16ee..7cc263a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -61,10 +61,7 @@ public class StreamThreadStateStoreProvider { if (keyTaskId != null) { final Task task = tasks.get(keyTaskId); if (task == null) { - throw new InvalidStateStoreException( - String.format("The specified partition %d for store %s does not exist.", - storeQueryParams.partition(), - storeName)); + return Collections.emptyList(); } final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId); if (store != null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index e09f420..fb73101 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -62,6 +62,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertTrue; @Category({IntegrationTest.class}) @@ -296,6 +297,74 @@ public class StoreQueryIntegrationTest { assertThat(store4.get(key), is(nullValue())); } + @Test + public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws Exception { + final int batch1NumMessages = 100; + final int key = 1; + final Semaphore semaphore = new Semaphore(0); + final int numStreamThreads = 2; + + final StreamsBuilder builder = new StreamsBuilder(); + builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); + + final Properties streamsConfiguration1 = streamsConfiguration(); + streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); + + final Properties streamsConfiguration2 = streamsConfiguration(); + streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); + + final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1); + final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2); + final List kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); + + startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); + + assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1); + assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1); + + produceValueRange(key, 0, batch1NumMessages); + + // Assert that all messages in the first batch were processed in a timely manner + assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); + final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer()); + + //key belongs to this partition + final int keyPartition = keyQueryMetadata.getPartition(); + + //key doesn't belongs to this partition + final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; + final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + + // Assert that both active and standby are able to query for a key + final StoreQueryParameters> param = StoreQueryParameters + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyPartition); + TestUtils.waitForCondition(() -> { + final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(kafkaStreams1, param); + return store1.get(key) != null; + }, "store1 cannot find results for key"); + TestUtils.waitForCondition(() -> { + final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(kafkaStreams2, param); + return store2.get(key) != null; + }, "store2 cannot find results for key"); + + final StoreQueryParameters> otherParam = StoreQueryParameters + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyDontBelongPartition); + final ReadOnlyKeyValueStore store3 = IntegrationTestUtils.getStore(kafkaStreams1, otherParam); + final ReadOnlyKeyValueStore store4 = IntegrationTestUtils.getStore(kafkaStreams2, otherParam); + + // Assert that + assertThat(store3.get(key), is(nullValue())); + assertThat(store4.get(key), is(nullValue())); + } + private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) { final KafkaStreams streams = new KafkaStreams(builder.build(config), config); streamsToCleanup.add(streams); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java index 19a0355..2d04755 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java @@ -30,6 +30,9 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertNotNull; public class QueryableStoreProviderTest { @@ -38,12 +41,15 @@ public class QueryableStoreProviderTest { private final String windowStore = "window-store"; private QueryableStoreProvider storeProvider; private HashMap globalStateStores; + private final int numStateStorePartitions = 2; @Before public void before() { final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub(false); - theStoreProvider.addStore(keyValueStore, new NoOpReadOnlyStore<>()); - theStoreProvider.addStore(windowStore, new NoOpWindowStore()); + for (int partition = 0; partition < numStateStorePartitions; partition++) { + theStoreProvider.addStore(keyValueStore, partition, new NoOpReadOnlyStore<>()); + theStoreProvider.addStore(windowStore, partition, new NoOpWindowStore()); + } globalStateStores = new HashMap<>(); storeProvider = new QueryableStoreProvider( @@ -88,5 +94,37 @@ public class QueryableStoreProviderTest { assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore()))); } + @Test + public void shouldReturnKVStoreWithPartitionWhenItExists() { + assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1))); + } + + @Test + public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() { + final int partition = numStateStorePartitions + 1; + final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () -> + storeProvider.getStore( + StoreQueryParameters + .fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()) + .withPartition(partition)) + ); + assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore))); + } + @Test + public void shouldReturnWindowStoreWithPartitionWhenItExists() { + assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore()).withPartition(numStateStorePartitions - 1))); + } + + @Test + public void shouldThrowExceptionWhenWindowStoreWithPartitionDoesntExists() { + final int partition = numStateStorePartitions + 1; + final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () -> + storeProvider.getStore( + StoreQueryParameters + .fromNameAndType(windowStore, QueryableStoreTypes.windowStore()) + .withPartition(partition)) + ); + assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore))); + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index b7d2d32..a171a46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -76,9 +76,7 @@ import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; -import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; public class StreamThreadStateStoreProviderTest { @@ -326,16 +324,12 @@ public class StreamThreadStateStoreProviderTest { } @Test - public void shouldThrowForInvalidPartitions() { + public void shouldReturnEmptyListForInvalidPartitions() { mockThread(true); - final InvalidStateStoreException thrown = assertThrows( - InvalidStateStoreException.class, - () -> provider.stores( - StoreQueryParameters - .fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()) - .withPartition(2)) + assertEquals( + Collections.emptyList(), + provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()).withPartition(2)) ); - assertThat(thrown.getMessage(), equalTo("The specified partition 2 for store kv-store does not exist.")); } @Test diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java index 13a29e1..bc0e33a 100644 --- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java +++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java @@ -22,16 +22,22 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; +import java.util.AbstractMap.SimpleEntry; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; public class StateStoreProviderStub extends StreamThreadStateStoreProvider { - private final Map stores = new HashMap<>(); + // -> state store + private final Map, StateStore> stores = new HashMap<>(); private final boolean throwException; + private final int defaultStorePartition = 0; + public StateStoreProviderStub(final boolean throwException) { super(null, null); this.throwException = throwException; @@ -45,15 +51,28 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider { if (throwException) { throw new InvalidStateStoreException("store is unavailable"); } - if (stores.containsKey(storeName) && queryableStoreType.accepts(stores.get(storeName))) { - return (List) Collections.singletonList(stores.get(storeName)); + if (storeQueryParameters.partition() != null) { + final Entry stateStoreKey = new SimpleEntry<>(storeName, storeQueryParameters.partition()); + if (stores.containsKey(stateStoreKey) && queryableStoreType.accepts(stores.get(stateStoreKey))) { + return (List) Collections.singletonList(stores.get(stateStoreKey)); + } + return Collections.emptyList(); } - return Collections.emptyList(); + return (List) Collections.unmodifiableList( + stores.entrySet().stream(). + filter(entry -> entry.getKey().getKey().equals(storeName) && queryableStoreType.accepts(entry.getValue())). + map(Entry::getValue). + collect(Collectors.toList())); } public void addStore(final String storeName, final StateStore store) { - stores.put(storeName, store); + addStore(storeName, defaultStorePartition, store); } + public void addStore(final String storeName, + final int partition, + final StateStore store) { + stores.put(new SimpleEntry<>(storeName, partition), store); + } }