kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3739: Add no-arg constructor for WindowedSerdes in Streams
Date Fri, 13 Jan 2017 19:13:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d1956d4b6 -> fc0d612b0


KAFKA-3739: Add no-arg constructor for WindowedSerdes in Streams

Add default constructor for library provided serdes

Author: huxi <huxi@zhenrongbao.com>
Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2308 from amethystic/kafka3739_add_noarg_constructor_for_streaming_serdes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc0d612b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc0d612b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc0d612b

Branch: refs/heads/trunk
Commit: fc0d612b0806032675b433a48f56ccd04f7e52cf
Parents: d1956d4
Author: Xi Hu <huxi@zhenrongbao.com>
Authored: Fri Jan 13 11:13:16 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jan 13 11:13:16 2017 -0800

----------------------------------------------------------------------
 .../kstream/internals/WindowedDeserializer.java | 30 ++++++++++-
 .../kstream/internals/WindowedSerializer.java   | 29 +++++++++-
 .../WindowedStreamPartitionerTest.java          | 57 ++++++++++++++++++++
 3 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0d612b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
index 96c3668..d85fb70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
@@ -17,25 +17,48 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Windowed;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+/**
+ *  The inner deserializer class can be specified by setting the property key.deserializer.inner.class,
+ *  value.deserializer.inner.class or deserializer.inner.class,
+ *  if the no-arg constructor is called and hence it is not passed during initialization.
+ *  Note that the first two take precedence over the last.
+ */
 public class WindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
 
     private static final int TIMESTAMP_SIZE = 8;
 
     private Deserializer<T> inner;
 
+    // Default constructor needed by Kafka
+    public WindowedDeserializer() {}
+
     public WindowedDeserializer(Deserializer<T> inner) {
         this.inner = inner;
     }
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
-        // do nothing
+        if (inner == null) {
+            String propertyName = isKey ? "key.deserializer.inner.class" : "value.deserializer.inner.class";
+            Object innerDeserializerClass = configs.get(propertyName);
+            propertyName = (innerDeserializerClass == null) ? "deserializer.inner.class"
: propertyName;
+            String value = null;
+            try {
+                value = (String) configs.get(propertyName);
+                inner = Deserializer.class.cast(Utils.newInstance(value, Deserializer.class));
+                inner.configure(configs, isKey);
+            } catch (ClassNotFoundException e) {
+                throw new ConfigException(propertyName, value, "Class " + value + " could
not be found.");
+            }
+        }
     }
 
     @Override
@@ -56,4 +79,9 @@ public class WindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
     public void close() {
         inner.close();
     }
+
+    // Only for testing
+    public Deserializer<T> innerDeserializer() {
+        return inner;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0d612b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index 2e19816..8cac7aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -17,12 +17,20 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Windowed;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+/**
+ *  The inner serializer class can be specified by setting the property key.serializer.inner.class,
+ *  value.serializer.inner.class or serializer.inner.class,
+ *  if the no-arg constructor is called and hence it is not passed during initialization.
+ *  Note that the first two take precedence over the last.
+ */
 public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
 
     private static final int TIMESTAMP_SIZE = 8;
@@ -33,9 +41,24 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>>
{
         this.inner = inner;
     }
 
+    // Default constructor needed by Kafka
+    public WindowedSerializer() {}
+
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
-        // do nothing
+        if (inner == null) {
+            String propertyName = isKey ? "key.serializer.inner.class" : "value.serializer.inner.class";
+            Object innerSerializerClass = configs.get(propertyName);
+            propertyName = (innerSerializerClass == null) ? "serializer.inner.class" : propertyName;
+            String value = null;
+            try {
+                value = (String) configs.get(propertyName);
+                inner = Serializer.class.cast(Utils.newInstance(value, Serializer.class));
+                inner.configure(configs, isKey);
+            } catch (ClassNotFoundException e) {
+                throw new ConfigException(propertyName, value, "Class " + value + " could
not be found.");
+            }
+        }
     }
 
     @Override
@@ -58,4 +81,8 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>>
{
         return inner.serialize(topic, data.key());
     }
 
+    // Only for testing
+    public Serializer<T> innerSerializer() {
+        return inner;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0d612b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index cd45aee..4be83be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -21,8 +21,14 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.junit.Test;
 
@@ -30,8 +36,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Map;
+import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class WindowedStreamPartitionerTest {
 
@@ -82,4 +92,51 @@ public class WindowedStreamPartitionerTest {
         }
     }
 
+    @Test
+    public void testWindowedSerializerNoArgConstructors() {
+        Map<String, String> props = new HashMap<>();
+        // test key[value].serializer.inner.class takes precedence over serializer.inner.class
+        WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>();
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
+        windowedSerializer.configure(props, true);
+        Serializer<?> inner = windowedSerializer.innerSerializer();
+        assertNotNull("Inner serializer should be not null", inner);
+        assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer);
+        // test serializer.inner.class
+        props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.remove("key.serializer.inner.class");
+        props.remove("value.serializer.inner.class");
+        WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>();
+        windowedSerializer1.configure(props, false);
+        Serializer<?> inner1 = windowedSerializer1.innerSerializer();
+        assertNotNull("Inner serializer should be not null", inner1);
+        assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof
ByteArraySerializer);
+    }
+
+    @Test
+    public void testWindowedDeserializerNoArgConstructors() {
+        Map<String, String> props = new HashMap<>();
+        // test key[value].deserializer.inner.class takes precedence over serializer.inner.class
+        WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>();
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
+        windowedDeserializer.configure(props, true);
+        Deserializer<?> inner = windowedDeserializer.innerDeserializer();
+        assertNotNull("Inner deserializer should be not null", inner);
+        assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof
StringDeserializer);
+        // test deserializer.inner.class
+        props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.remove("key.deserializer.inner.class");
+        props.remove("value.deserializer.inner.class");
+        WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>();
+        windowedDeserializer1.configure(props, false);
+        Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
+        assertNotNull("Inner deserializer should be not null", inner1);
+        assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof
ByteArrayDeserializer);
+    }
 }


Mime
View raw message