kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes (#8764)
Date Fri, 12 Jun 2020 16:55:31 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f37060c  KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when
using default StreamsConfig serdes (#8764)
f37060c is described below

commit f37060cab3e29bb8722f69803d7789ab0b3f2ff0
Author: Adam Bellemare <adam.bellemare@shopify.com>
AuthorDate: Fri Jun 12 11:00:38 2020 -0400

    KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default
StreamsConfig serdes (#8764)
    
    Bug Details:
    Mistakenly setting the value serde to the key serde for an internal wrapped serde in the
FKJ workflow.
    
    Testing:
    Modified the existing test to reproduce the issue, then verified that the test passes.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <vvcephei@apache.org>
---
 checkstyle/import-control.xml                      |   1 +
 .../kstream/internals/ChangedDeserializer.java     |   6 +-
 .../kstream/internals/ChangedSerializer.java       |   6 +-
 .../internals/WrappingNullableDeserializer.java    |   6 +-
 .../internals/WrappingNullableSerializer.java      |   4 +-
 .../SubscriptionResponseWrapperSerde.java          |  12 +--
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   |  12 +--
 .../streams/processor/internals/SinkNode.java      |   7 +-
 .../streams/processor/internals/SourceNode.java    |   7 +-
 ...KTableKTableForeignKeyJoinDefaultSerdeTest.java | 103 +++++++++++----------
 10 files changed, 86 insertions(+), 78 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e3975a3..dd8d0d7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -251,6 +251,7 @@
       <allow pkg="kafka.log" />
       <allow pkg="scala" />
       <allow class="kafka.zk.EmbeddedZookeeper"/>
+      <allow pkg="com.fasterxml.jackson" />
     </subpackage>
 
     <subpackage name="test">
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 90d5882..433a18d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class ChangedDeserializer<T> implements Deserializer<Change<T>>,
WrappingNullableDeserializer<Change<T>, T> {
+public class ChangedDeserializer<T> implements Deserializer<Change<T>>,
WrappingNullableDeserializer<Change<T>, Void, T> {
 
     private static final int NEWFLAG_SIZE = 1;
 
@@ -37,9 +37,9 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>,
Wrapping
     }
 
     @Override
-    public void setIfUnset(final Deserializer<T> defaultDeserializer) {
+    public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final Deserializer<T>
defaultValueDeserializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer cannot
be null");
+            inner = Objects.requireNonNull(defaultValueDeserializer);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 551d948..f5d63cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>,
T> {
+public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>,
Void, T> {
 
     private static final int NEWFLAG_SIZE = 1;
 
@@ -38,9 +38,9 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>,
WrappingNull
     }
 
     @Override
-    public void setIfUnset(final Serializer<T> defaultSerializer) {
+    public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<T>
defaultValueSerializer) {
         if (inner == null) {
-            inner = Objects.requireNonNull(defaultSerializer, "defaultSerializer cannot be
null");
+            inner = Objects.requireNonNull(defaultValueSerializer);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
index a57e9a1..d0c0b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableDeserializer.java
@@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Deserializer;
 
-public interface WrappingNullableDeserializer<Outer, Inner> extends Deserializer<Outer>
{
-    void setIfUnset(final Deserializer<Inner> defaultDeserializer);
-}
+public interface WrappingNullableDeserializer<Outer, InnerK, InnerV> extends Deserializer<Outer>
{
+    void setIfUnset(final Deserializer<InnerK> defaultKeyDeserializer, final Deserializer<InnerV>
defaultValueDeserializer);
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
index 2d28e52..8854a8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerializer.java
@@ -18,6 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
 
-public interface WrappingNullableSerializer<Outer, Inner> extends Serializer<Outer>
{
-    void setIfUnset(final Serializer<Inner> defaultSerializer);
+public interface WrappingNullableSerializer<Outer, InnerK, InnerV> extends Serializer<Outer>
{
+    void setIfUnset(final Serializer<InnerK> defaultKeySerializer, final Serializer<InnerV>
defaultValueSerializer);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 31317c5..8619111 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -46,7 +46,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
     }
 
     private static final class SubscriptionResponseWrapperSerializer<V>
-        implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>,
V> {
+        implements Serializer<SubscriptionResponseWrapper<V>>, WrappingNullableSerializer<SubscriptionResponseWrapper<V>,
Void, V> {
 
         private Serializer<V> serializer;
 
@@ -55,9 +55,9 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
         }
 
         @Override
-        public void setIfUnset(final Serializer<V> defaultSerializer) {
+        public void setIfUnset(final Serializer<Void> defaultKeySerializer, final Serializer<V>
defaultValueSerializer) {
             if (serializer == null) {
-                serializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer
cannot be null");
+                serializer = Objects.requireNonNull(defaultValueSerializer);
             }
         }
 
@@ -94,7 +94,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
     }
 
     private static final class SubscriptionResponseWrapperDeserializer<V>
-        implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>,
V> {
+        implements Deserializer<SubscriptionResponseWrapper<V>>, WrappingNullableDeserializer<SubscriptionResponseWrapper<V>,
Void, V> {
 
         private Deserializer<V> deserializer;
 
@@ -103,9 +103,9 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
         }
 
         @Override
-        public void setIfUnset(final Deserializer<V> defaultDeserializer) {
+        public void setIfUnset(final Deserializer<Void> defaultKeyDeserializer, final
Deserializer<V> defaultValueDeserializer) {
             if (deserializer == null) {
-                deserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer
cannot be null");
+                deserializer = Objects.requireNonNull(defaultValueDeserializer);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 136128c..d2cc989 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -50,7 +50,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     }
 
     private static class SubscriptionWrapperSerializer<K>
-        implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>,
K> {
+        implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>,
K, Void> {
 
         private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
         private String primaryKeySerializationPseudoTopic = null;
@@ -63,9 +63,9 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
         }
 
         @Override
-        public void setIfUnset(final Serializer<K> defaultSerializer) {
+        public void setIfUnset(final Serializer<K> defaultKeySerializer, final Serializer<Void>
defaultValueSerializer) {
             if (primaryKeySerializer == null) {
-                primaryKeySerializer = Objects.requireNonNull(defaultSerializer, "defaultSerializer
cannot be null");
+                primaryKeySerializer = Objects.requireNonNull(defaultKeySerializer);
             }
         }
 
@@ -110,7 +110,7 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
     }
 
     private static class SubscriptionWrapperDeserializer<K>
-        implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>,
K> {
+        implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>,
K, Void> {
 
         private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
         private String primaryKeySerializationPseudoTopic = null;
@@ -123,9 +123,9 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
         }
 
         @Override
-        public void setIfUnset(final Deserializer<K> defaultDeserializer) {
+        public void setIfUnset(final Deserializer<K> defaultKeyDeserializer, final
Deserializer<Void> defaultValueDeserializer) {
             if (primaryKeyDeserializer == null) {
-                primaryKeyDeserializer = Objects.requireNonNull(defaultDeserializer, "defaultDeserializer
cannot be null");
+                primaryKeyDeserializer = Objects.requireNonNull(defaultKeyDeserializer);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e3333be..b0da7b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -66,10 +66,13 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
             valSerializer = (Serializer<V>) context.valueSerde().serializer();
         }
 
-        // if value serializers are internal wrapping serializers that may need to be given
the default serializer
+        // if serializers are internal wrapping serializers that may need to be given the
default serializer
         // then pass it the default one from the context
         if (valSerializer instanceof WrappingNullableSerializer) {
-            ((WrappingNullableSerializer) valSerializer).setIfUnset(context.valueSerde().serializer());
+            ((WrappingNullableSerializer) valSerializer).setIfUnset(
+                context.keySerde().serializer(),
+                context.valueSerde().serializer()
+            );
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 853520a..2dd1e6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -88,10 +88,13 @@ public class SourceNode<K, V> extends ProcessorNode<K, V>
{
             this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
         }
 
-        // if value deserializers are internal wrapping deserializers that may need to be
given the default
+        // if deserializers are internal wrapping deserializers that may need to be given
the default
         // then pass it the default one from the context
         if (valDeserializer instanceof WrappingNullableDeserializer) {
-            ((WrappingNullableDeserializer) valDeserializer).setIfUnset(context.valueSerde().deserializer());
+            ((WrappingNullableDeserializer) valDeserializer).setIfUnset(
+                    context.keySerde().deserializer(),
+                    context.valueSerde().deserializer()
+            );
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
index 64c6b06..0a6b1bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -52,17 +54,17 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
     @Test
     public void shouldWorkWithDefaultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
@@ -75,17 +77,17 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
     @Test
     public void shouldWorkWithDefaultAndConsumedSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A", Consumed.with(Serdes.String(),
Serdes.String()));
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A", Consumed.with(Serdes.Integer(),
Serdes.String()));
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
@@ -98,20 +100,19 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
     @Test
     public void shouldWorkWithDefaultAndJoinResultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
-            Materialized
-                .<String, String, KeyValueStore<Bytes, byte[]>>as("asdf")
-                .withKeySerde(Serdes.String())
-                .withValueSerde(Serdes.String())
+            Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as("asdf")
+                    .withKeySerde(Serdes.Integer())
+                    .withValueSerde(Serdes.String())
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
@@ -124,20 +125,20 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
     @Test
     public void shouldWorkWithDefaultAndEquiJoinResultSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")",
-            Materialized.with(Serdes.String(), Serdes.String())
+            Materialized.with(Serdes.Integer(), Serdes.String())
         );
 
         finalJoinResult.toStream().to("output");
@@ -148,22 +149,22 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
     @Test
     public void shouldWorkWithDefaultAndProducedSerdes() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, String> aTable = builder.table("A");
-        final KTable<String, String> bTable = builder.table("B");
+        final KTable<Integer, String> aTable = builder.table("A");
+        final KTable<Integer, String> bTable = builder.table("B");
 
-        final KTable<String, String> fkJoinResult = aTable.join(
+        final KTable<Integer, String> fkJoinResult = aTable.join(
             bTable,
-            value -> value.split("-")[0],
+            value -> Integer.parseInt(value.split("-")[0]),
             (aVal, bVal) -> "(" + aVal + "," + bVal + ")",
             Materialized.as("asdf")
         );
 
-        final KTable<String, String> finalJoinResult = aTable.join(
+        final KTable<Integer, String> finalJoinResult = aTable.join(
             fkJoinResult,
             (aVal, fkJoinVal) -> "(" + aVal + "," + fkJoinVal + ")"
         );
 
-        finalJoinResult.toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
+        finalJoinResult.toStream().to("output", Produced.with(Serdes.Integer(), Serdes.String()));
 
         validateTopologyCanProcessData(builder);
     }
@@ -184,20 +185,20 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
         final String rightTable = "right_table";
         final String output = "output-topic";
 
-        final KTable<String, String> left = builder.table(
+        final KTable<Integer, String> left = builder.table(
             leftTable,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+            Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
+                        serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
         );
-        final KTable<String, String> right = builder.table(
-            rightTable,
-            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
-                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        final KTable<Integer, String> right = builder.table(
+                rightTable,
+                Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true),
+                              serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
         );
 
         left.join(
             right,
-            value -> value.split("\\|")[1],
+            value -> Integer.parseInt(value.split("\\|")[1]),
             (value1, value2) -> "(" + value1 + "," + value2 + ")",
             Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig,
false)
             ))
@@ -207,10 +208,10 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
 
         final Topology topology = builder.build(streamsConfig);
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig))
{
-            final TestInputTopic<String, String> leftInput = driver.createInputTopic(leftTable,
new StringSerializer(), new StringSerializer());
-            final TestInputTopic<String, String> rightInput = driver.createInputTopic(rightTable,
new StringSerializer(), new StringSerializer());
-            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
-            rightInput.pipeInput("rhs1", "rhsValue1");
+            final TestInputTopic<Integer, String> leftInput = driver.createInputTopic(leftTable,
new IntegerSerializer(), new StringSerializer());
+            final TestInputTopic<Integer, String> rightInput = driver.createInputTopic(rightTable,
new IntegerSerializer(), new StringSerializer());
+            leftInput.pipeInput(2, "lhsValue1|1");
+            rightInput.pipeInput(1, "rhsValue1");
         }
         // verifying primarily that no extra pseudo-topics were used, but it's nice to also
verify the rest of the
         // topics our serdes serialize data for
@@ -237,17 +238,17 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
         final Properties config = new Properties();
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + UUID.randomUUID());
         config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
         config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(),
config)) {
-            final TestInputTopic<String, String> aTopic = topologyTestDriver.createInputTopic("A",
new StringSerializer(), new StringSerializer());
-            final TestInputTopic<String, String> bTopic = topologyTestDriver.createInputTopic("B",
new StringSerializer(), new StringSerializer());
-            final TestOutputTopic<String, String> output = topologyTestDriver.createOutputTopic("output",
new StringDeserializer(), new StringDeserializer());
-            aTopic.pipeInput("a1", "b1-alpha");
-            bTopic.pipeInput("b1", "beta");
-            final Map<String, String> x = output.readKeyValuesToMap();
-            assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))")));
+            final TestInputTopic<Integer, String> aTopic = topologyTestDriver.createInputTopic("A",
new IntegerSerializer(), new StringSerializer());
+            final TestInputTopic<Integer, String> bTopic = topologyTestDriver.createInputTopic("B",
new IntegerSerializer(), new StringSerializer());
+            final TestOutputTopic<Integer, String> output = topologyTestDriver.createOutputTopic("output",
new IntegerDeserializer(), new StringDeserializer());
+            aTopic.pipeInput(1, "999-alpha");
+            bTopic.pipeInput(999, "beta");
+            final Map<Integer, String> x = output.readKeyValuesToMap();
+            assertThat(x, is(Collections.singletonMap(1, "(999-alpha,(999-alpha,beta))")));
         }
     }
 }


Mime
View raw message