kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9032: Bypass serdes for tombstones (#7518)
Date Wed, 16 Oct 2019 16:35:52 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new f2495e6  KAFKA-9032: Bypass serdes for tombstones (#7518)
f2495e6 is described below

commit f2495e6fc81a4bfcc9ce0908643fb1aa63d3dcd3
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Oct 16 11:34:52 2019 -0500

    KAFKA-9032: Bypass serdes for tombstones (#7518)
    
    In a KTable context, null record values have a special "tombstone" significance. We should
always bypass the serdes for such tombstones, since otherwise the serde could violate Streams'
table semantics.
    
    Added test coverage for this case and fixed the code accordingly.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bill@confluent.io>
---
 .../SubscriptionResponseWrapperSerde.java          | 12 +++--
 .../SubscriptionResponseWrapperSerdeTest.java      | 63 ++++++++++++++++++----
 2 files changed, 60 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 6524b4f..4277f9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -58,7 +58,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
                 throw new UnsupportedVersionException("SubscriptionResponseWrapper version
is larger than maximum supported 0x7F");
             }
 
-            final byte[] serializedData = serializer.serialize(topic, data.getForeignValue());
+            final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic,
data.getForeignValue());
             final int serializedDataLength = serializedData == null ? 0 : serializedData.length;
             final long[] originalHash = data.getOriginalValueHash();
             final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
@@ -108,14 +108,16 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
                 lengthSum += 2 * Long.BYTES;
             }
 
-            final byte[] serializedValue;
+            final V value;
             if (data.length - lengthSum > 0) {
+                final byte[] serializedValue;
                 serializedValue = new byte[data.length - lengthSum];
                 buf.get(serializedValue, 0, serializedValue.length);
-            } else
-                serializedValue = null;
+                value = deserializer.deserialize(topic, serializedValue);
+            } else {
+                value = null;
+            }
 
-            final V value = deserializer.deserialize(topic, serializedValue);
             return new SubscriptionResponseWrapper<>(hash, value, version);
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index fde9bdd..40948e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -17,15 +17,58 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.state.internals.Murmur3;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class SubscriptionResponseWrapperSerdeTest {
+    private static final class NonNullableSerde<T> implements Serde<T>, Serializer<T>,
Deserializer<T> {
+        private final Serde<T> delegate;
+
+        NonNullableSerde(final Serde<T> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public Serializer<T> serializer() {
+            return this;
+        }
+
+        @Override
+        public Deserializer<T> deserializer() {
+            return this;
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final T data) {
+            return delegate.serializer().serialize(topic, requireNonNull(data));
+        }
+
+        @Override
+        public T deserialize(final String topic, final byte[] data) {
+            return delegate.deserializer().deserialize(topic, requireNonNull(data));
+        }
+    }
 
     @Test
     @SuppressWarnings("unchecked")
@@ -33,9 +76,9 @@ public class SubscriptionResponseWrapperSerdeTest {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A,
(byte) 0xFF, (byte) 0x00});
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue,
foreignValue);
-        final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
+        final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new
NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>)
srwSerde.deserializer().deserialize(null, serResponse);
+        final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null,
serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertEquals(foreignValue, result.getForeignValue());
@@ -46,9 +89,9 @@ public class SubscriptionResponseWrapperSerdeTest {
     public void shouldSerdeWithNullForeignValueTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A,
(byte) 0xFF, (byte) 0x00});
         final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue,
null);
-        final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
+        final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new
NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>)
srwSerde.deserializer().deserialize(null, serResponse);
+        final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null,
serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertNull(result.getForeignValue());
@@ -60,9 +103,9 @@ public class SubscriptionResponseWrapperSerdeTest {
         final long[] hashedValue = null;
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue,
foreignValue);
-        final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
+        final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new
NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>)
srwSerde.deserializer().deserialize(null, serResponse);
+        final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null,
serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertEquals(foreignValue, result.getForeignValue());
@@ -74,15 +117,15 @@ public class SubscriptionResponseWrapperSerdeTest {
         final long[] hashedValue = null;
         final String foreignValue = null;
         final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue,
foreignValue);
-        final SubscriptionResponseWrapperSerde srwSerde = new SubscriptionResponseWrapperSerde(Serdes.String());
+        final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new
NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = (SubscriptionResponseWrapper<String>)
srwSerde.deserializer().deserialize(null, serResponse);
+        final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null,
serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertEquals(foreignValue, result.getForeignValue());
     }
 
-    @Test (expected = UnsupportedVersionException.class)
+    @Test(expected = UnsupportedVersionException.class)
     @SuppressWarnings("unchecked")
     public void shouldThrowExceptionWithBadVersionTest() {
         final long[] hashedValue = null;


Mime
View raw message