kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4532: StateStores can be connected to the wrong source topic resulting in incorrect metadata returned from Interactive Queries
Date Tue, 13 Dec 2016 20:34:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 859113786 -> 448f194c7


KAFKA-4532: StateStores can be connected to the wrong source topic resulting in incorrect
metadata returned from Interactive Queries

When building a topology with tables and StateStores, the StateStores are mapped to the source
topic names. This map is retrieved via TopologyBuilder.stateStoreNameToSourceTopics() and
is used in Interactive Queries to find the source topics and partitions when resolving the
partitions that particular keys will be in.
There is an issue where by this mapping for a table that is originally created with builder.table("topic",
"table");, and then is subsequently used in a join, is changed to the internal repartition
topic. This is because the mapping is updated during the call to topology.connectProcessorAndStateStores(..).
In the case that the stateStoreNameToSourceTopics Map already has a value for the state store
name it should not update the Map.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2250 from dguy/kafka-4532


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/448f194c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/448f194c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/448f194c

Branch: refs/heads/trunk
Commit: 448f194c70e7a66ae2f1a7e89c822032359b14c9
Parents: 8591137
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Dec 13 12:34:46 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 13 12:34:46 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/TopologyBuilder.java |  7 +++++++
 .../streams/kstream/KStreamBuilderTest.java      | 19 +++++++++++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/448f194c/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ecac8c9..74fea9c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -692,6 +692,13 @@ public class TopologyBuilder {
     private void connectStateStoreNameToSourceTopics(final String stateStoreName,
                                                      final ProcessorNodeFactory processorNodeFactory)
{
 
+        // we should never update the mapping from state store names to source topics if
the store name already exists
+        // in the map; this scenario is possible, for example, that a state store underlying
a source KTable is
+        // connecting to a join operator whose source topic is not the original KTable's
source topic but an internal repartition topic.
+        if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) {
+            return;
+        }
+
         final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
         if (sourceTopics.isEmpty()) {
             throw new TopologyBuilderException("can't find source topic for state store "
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/448f194c/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index b951743..52decf4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
 import org.junit.After;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
@@ -150,4 +153,20 @@ public class KStreamBuilderTest {
         new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null);
     }
 
+    @Test
+    public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId("app-id");
+
+        final KStream<String, String> playEvents = builder.stream("events");
+
+        final KTable<String, String> table = builder.table("table-topic", "table-store");
+        assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
+
+        final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String,
String>SelectValueKeyValueMapper());
+        mapped.leftJoin(table, MockValueJoiner.STRING_JOINER).groupByKey().count("count");
+        assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
+        assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"),
builder.stateStoreNameToSourceTopics().get("count"));
+    }
+
 }


Mime
View raw message