kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5956; use serdes from materialized in table and globalTable
Date Fri, 22 Sep 2017 12:45:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7c988a3c8 -> 125d8d6f7


KAFKA-5956; use serdes from materialized in table and globalTable

The new overloads `StreamBuilder.table(String, Materialized)` and `StreamsBuilder.globalTable(String,
Materialized)` need to set the serdes from `Materialized` on the internal `Consumed` instance
that is created, otherwise the defaults will be used and may result in serialization errors

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3936 from dguy/table-materialized


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/125d8d6f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/125d8d6f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/125d8d6f

Branch: refs/heads/trunk
Commit: 125d8d6f70829b9a0dbeabfef8f6b2df438dc12b
Parents: 7c988a3
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 22 13:45:19 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 22 13:45:19 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamsBuilder.java    | 10 ++--
 .../kafka/streams/StreamsBuilderTest.java       | 53 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/125d8d6f/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a272ec4..7e746e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -301,8 +301,10 @@ public class StreamsBuilder {
                                                   final Materialized<K, V, KeyValueStore<Bytes,
byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<K, V>(),
+                                            new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
+                                                                                 materializedInternal.valueSerde())),
                                             new MaterializedInternal<>(materialized));
     }
 
@@ -429,9 +431,11 @@ public class StreamsBuilder {
                                                               final Materialized<K, V,
KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
         return internalStreamsBuilder.globalTable(topic,
-                                                  new ConsumedInternal<K, V>(),
-                                                  new MaterializedInternal<>(materialized));
+                                                  new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
+                                                                                       materializedInternal.valueSerde())),
+                                                  materializedInternal);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/125d8d6f/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index dedd157..4ce202b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -16,21 +16,31 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class StreamsBuilderTest {
@@ -108,6 +118,49 @@ public class StreamsBuilderTest {
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
     }
 
+    @Test
+    public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
+        final Map<Long, String> results = new HashMap<>();
+        final String topic = "topic";
+        final ForeachAction<Long, String> action = new ForeachAction<Long, String>()
{
+            @Override
+            public void apply(final Long key, final String value) {
+                results.put(key, value);
+            }
+        };
+        builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
+                .withKeySerde(Serdes.Long())
+                .withValueSerde(Serdes.String()))
+                .toStream().foreach(action);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+        driver.setTime(0L);
+        driver.process(topic, 1L, "value1");
+        driver.process(topic, 2L, "value2");
+        driver.flushState();
+        final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store");
+        assertThat(store.get(1L), equalTo("value1"));
+        assertThat(store.get(2L), equalTo("value2"));
+        assertThat(results.get(1L), equalTo("value1"));
+        assertThat(results.get(2L), equalTo("value2"));
+    }
+
+    @Test
+    public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
+        final String topic = "topic";
+        builder.globalTable(topic, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("store")
+                .withKeySerde(Serdes.Long())
+                .withValueSerde(Serdes.String()));
+        driver.setUp(builder, TestUtils.tempDirectory());
+        driver.setTime(0L);
+        driver.process(topic, 1L, "value1");
+        driver.process(topic, 2L, "value2");
+        driver.flushState();
+        final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store");
+        assertThat(store.get(1L), equalTo("value1"));
+        assertThat(store.get(2L), equalTo("value2"));
+    }
+
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
         builder.stream(Collections.<String>emptyList());


Mime
View raw message