kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7110: Add windowed changelog serde (#5307)
Date Fri, 04 Jan 2019 19:42:43 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 329297b  KAFKA-7110: Add windowed changelog serde (#5307)
329297b is described below

commit 329297ba3840df9cead731adf3448edebf6e37e2
Author: Shawn Nguyen <shawnsnguyen@gmail.com>
AuthorDate: Fri Jan 4 11:42:33 2019 -0800

    KAFKA-7110: Add windowed changelog serde (#5307)
    
    Currently the TimeWindowedSerde does not deserialize the windowed keys from a changelog
topic properly. There are a few assumptions made in the TimeWindowedDeserializer that prevents
the changelog windowed keys from being correctly deserialized. This PR will introduce a new
WindowSerde to allow proper deserialization of changelog windowed keys.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/kstream/TimeWindowedDeserializer.java  | 15 ++++++++--
 .../kafka/streams/kstream/WindowedSerdes.java      | 20 +++++++++++++
 .../state/internals/CachingWindowStore.java        |  2 +-
 ...rgedSortedCacheWindowStoreKeyValueIterator.java |  2 +-
 .../streams/state/internals/WindowKeySchema.java   |  7 +++--
 .../internals/RocksDBSegmentedBytesStoreTest.java  |  2 +-
 .../state/internals/WindowKeySchemaTest.java       | 34 +++++++++++++++++-----
 7 files changed, 67 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
index cb9c506..4adae4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java
@@ -34,7 +34,8 @@ import java.util.Map;
 public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
 
     private final Long windowSize;
-    
+    private boolean isChangelogTopic;
+
     private Deserializer<T> inner;
     
     // Default constructor needed by Kafka
@@ -50,6 +51,7 @@ public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
     public TimeWindowedDeserializer(final Deserializer<T> inner, final long windowSize)
{
         this.inner = inner;
         this.windowSize = windowSize;
+        this.isChangelogTopic = false;
     }
 
     public Long getWindowSize() {
@@ -77,6 +79,11 @@ public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
             return null;
         }
 
+        // toStoreKeyBinary was used to serialize the data.
+        if (this.isChangelogTopic)
+            return WindowKeySchema.fromStoreKey(data, windowSize, inner, topic);
+
+        // toBinary was used to serialize the data
         return WindowKeySchema.from(data, windowSize, inner, topic);
     }
 
@@ -84,7 +91,11 @@ public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
     public void close() {
         inner.close();
     }
-    
+
+    public void setIsChangelogTopic(final boolean isChangelogTopic) {
+        this.isChangelogTopic = isChangelogTopic;
+    }
+
     // Only for testing
     Deserializer<T> innerDeserializer() {
         return inner;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
index 2474860..44489fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -30,6 +30,18 @@ public class WindowedSerdes {
         public TimeWindowedSerde(final Serde<T> inner) {
             super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer()));
         }
+
+        // This constructor can be used for serialize/deserialize a windowed topic
+        public TimeWindowedSerde(final Serde<T> inner, final long windowSize) {
+            super(new TimeWindowedSerializer<>(inner.serializer()), new TimeWindowedDeserializer<>(inner.deserializer(),
windowSize));
+        }
+
+        // Helper method for users to specify whether the input topic is a changelog topic
for deserializing the key properly.
+        public TimeWindowedSerde<T> forChangelog(final boolean isChangelogTopic) {
+            final TimeWindowedDeserializer deserializer = (TimeWindowedDeserializer) this.deserializer();
+            deserializer.setIsChangelogTopic(isChangelogTopic);
+            return this;
+        }
     }
 
     static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>
{
@@ -51,6 +63,14 @@ public class WindowedSerdes {
     }
 
     /**
+     * Construct a {@code TimeWindowedSerde} object to deserialize changelog topic
+     * for the specified inner class type and window size.
+     */
+    static public <T> Serde<Windowed<T>> timeWindowedSerdeFrom(final Class<T>
type, final long windowSize) {
+        return new TimeWindowedSerde<>(Serdes.serdeFrom(type), windowSize);
+    }
+
+    /**
      * Construct a {@code SessionWindowedSerde} object for the specified inner class type.
      */
     static public <T> Serde<Windowed<T>> sessionWindowedSerdeFrom(final
Class<T> type) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index c04921b..a934e81 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -93,7 +93,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
                     final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get();
                     final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey);
 
-                    final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey,
windowSize, serdes);
+                    final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey,
windowSize, serdes.keyDeserializer(), serdes.topic());
                     final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey));
                     maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
                     underlying.put(key, entry.newValue(), timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
index a48c81a..1cba018 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
@@ -56,7 +56,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
     @Override
     Windowed<Bytes> deserializeCacheKey(final Bytes cacheKey) {
         final byte[] binaryKey = cacheFunction.key(cacheKey).get();
-        return WindowKeySchema.fromStoreKey(binaryKey, windowSize, serdes);
+        return WindowKeySchema.fromStoreKey(binaryKey, windowSize, serdes.keyDeserializer(),
serdes.topic());
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 3c59cd6..e4d9958 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -203,13 +203,14 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema
{
 
     public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
                                                final long windowSize,
-                                               final StateSerdes<K, ?> serdes) {
-        final K key = serdes.keyDeserializer().deserialize(serdes.topic(), extractStoreKeyBytes(binaryKey));
+                                               final Deserializer<K> deserializer,
+                                               final String topic) {
+        final K key = deserializer.deserialize(topic, extractStoreKeyBytes(binaryKey));
         final Window window = extractStoreWindow(binaryKey, windowSize);
         return new Windowed<>(key, window);
     }
 
-    public static Windowed<Bytes> fromStoreKey(final byte[] binaryKey,
+    public static Windowed<Bytes> fromStoreBytesKey(final byte[] binaryKey,
                                                final long windowSize) {
         final Bytes key = Bytes.wrap(extractStoreKeyBytes(binaryKey));
         final Window window = extractStoreWindow(binaryKey, windowSize);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 93e3452..d03f17b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -477,7 +477,7 @@ public class RocksDBSegmentedBytesStoreTest {
             final KeyValue<Bytes, byte[]> next = iterator.next();
             if (schema instanceof WindowKeySchema) {
                 final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
-                    WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow,
stateSerdes),
+                    WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow,
stateSerdes.keyDeserializer(), stateSerdes.topic()),
                     stateSerdes.valueDeserializer().deserialize("dummy", next.value)
                 );
                 results.add(deserialized);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 9f617f2..8eaf0d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -201,29 +201,49 @@ public class WindowKeySchemaTest {
     public void shouldSerializeDeserializeExpectedWindowSize() {
         final byte[] bytes = keySerde.serializer().serialize(topic, windowedKey);
         final Windowed<String> result = new TimeWindowedDeserializer<>(serde.deserializer(),
endTime - startTime)
-                .deserialize(topic, bytes);
+            .deserialize(topic, bytes);
         assertEquals(windowedKey, result);
     }
 
     @Test
+    public void shouldSerializeDeserializeExpectedChangelogWindowSize() {
+        // Key-value containing serialized store key binary and the key's window size
+        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(
+            KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new
byte[]{0}), new TimeWindow(0, 1)), 0), 1),
+            KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new
byte[]{0, 0}), new TimeWindow(0, 10)), 0), 10),
+            KeyValue.pair(WindowKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new
byte[]{0, 0, 0}), new TimeWindow(10, 30)), 6), 20));
+
+        final List<Long> results = new ArrayList<>();
+        for (final KeyValue<Bytes, Integer> keyValue : keys) {
+            // Let the deserializer know that it's deserializing a changelog windowed key
+            final Serde<Windowed<String>> keySerde = new WindowedSerdes.TimeWindowedSerde<>(serde,
keyValue.value).forChangelog(true);
+            final Windowed<String> result = keySerde.deserializer().deserialize(topic,
keyValue.key.get());
+            final Window resultWindow = result.window();
+            results.add(resultWindow.end() - resultWindow.start());
+        }
+
+        assertThat(results, equalTo(Arrays.asList(1L, 10L, 20L)));
+    }
+
+    @Test
     public void shouldSerializeNullToNull() {
         assertNull(keySerde.serializer().serialize(topic, null));
     }
 
     @Test
-    public void shouldDeSerializeEmtpyByteArrayToNull() {
+    public void shouldDeserializeEmptyByteArrayToNull() {
         assertNull(keySerde.deserializer().deserialize(topic, new byte[0]));
     }
 
     @Test
-    public void shouldDeSerializeNullToNull() {
+    public void shouldDeserializeNullToNull() {
         assertNull(keySerde.deserializer().deserialize(topic, null));
     }
 
     @Test
     public void shouldConvertToBinaryAndBack() {
         final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
-        final Windowed<String> result = WindowKeySchema.fromStoreKey(serialized.get(),
endTime - startTime, stateSerdes);
+        final Windowed<String> result = WindowKeySchema.fromStoreKey(serialized.get(),
endTime - startTime, stateSerdes.keyDeserializer(), stateSerdes.topic());
         assertEquals(windowedKey, result);
     }
 
@@ -240,7 +260,7 @@ public class WindowKeySchemaTest {
     }
 
     @Test
-    public void shouldExtractWindowFromBindary() {
+    public void shouldExtractWindowFromBinary() {
         final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
         assertEquals(window, WindowKeySchema.extractStoreWindow(serialized.get(), endTime
- startTime));
     }
@@ -254,13 +274,13 @@ public class WindowKeySchemaTest {
     @Test
     public void shouldExtractKeyFromBinary() {
         final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
-        assertEquals(windowedKey, WindowKeySchema.fromStoreKey(serialized.get(), endTime
- startTime, stateSerdes));
+        assertEquals(windowedKey, WindowKeySchema.fromStoreKey(serialized.get(), endTime
- startTime, stateSerdes.keyDeserializer(), stateSerdes.topic()));
     }
 
     @Test
     public void shouldExtractBytesKeyFromBinary() {
         final Windowed<Bytes> windowedBytesKey = new Windowed<>(Bytes.wrap(key.getBytes()),
window);
         final Bytes serialized = WindowKeySchema.toStoreKeyBinary(windowedBytesKey, 0);
-        assertEquals(windowedBytesKey, WindowKeySchema.fromStoreKey(serialized.get(), endTime
- startTime));
+        assertEquals(windowedBytesKey, WindowKeySchema.fromStoreBytesKey(serialized.get(),
endTime - startTime));
     }
 }


Mime
View raw message