kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3639; Configure default serdes upon construction
Date Wed, 04 May 2016 23:23:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk da7095f36 -> c8c6ac3f6


KAFKA-3639; Configure default serdes upon construction

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Michael G. Noll <michael@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #1311 from guozhangwang/K3639


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c8c6ac3f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c8c6ac3f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c8c6ac3f

Branch: refs/heads/trunk
Commit: c8c6ac3f6d2d590261acb35fabbf7418ae102d4e
Parents: da7095f
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu May 5 00:23:34 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu May 5 00:23:34 2016 +0100

----------------------------------------------------------------------
 .../kafka/common/serialization/Serde.java       |  24 ++++-
 .../kafka/common/serialization/Serdes.java      | 103 ++++++++-----------
 .../org/apache/kafka/streams/StreamsConfig.java |  10 +-
 .../apache/kafka/streams/StreamsConfigTest.java |  32 +++++-
 4 files changed, 99 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ac3f/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
index cc7944e..42b8c1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
@@ -13,12 +13,32 @@
 
 package org.apache.kafka.common.serialization;
 
+import java.io.Closeable;
+import java.util.Map;
+
 /**
  * The interface for wrapping a serializer and deserializer for the given data type.
  *
- * @param <T>
+ * @param <T> Type to be serialized from and deserialized into.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
  */
-public interface Serde<T> {
+public interface Serde<T> extends Closeable {
+
+    /**
+     * Configure this class, which will configure the underlying serializer and deserializer.
+     *
+     * @param configs configs in key/value pairs
+     * @param isKey whether is for key or value
+     */
+    void configure(Map<String, ?> configs, boolean isKey);
+
+    /**
+     * Close this serde class, which will close the underlying serializer and deserializer.
+     * This method has to be idempotent because it might be called multiple times.
+     */
+    @Override
+    void close();
 
     Serializer<T> serializer();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ac3f/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index d744522..9075a93 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -16,93 +16,84 @@ package org.apache.kafka.common.serialization;
 import org.apache.kafka.common.utils.Bytes;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 /**
  * Factory for creating serializers / deserializers.
  */
 public class Serdes {
 
-    static public final class LongSerde implements Serde<Long> {
-        @Override
-        public Serializer<Long> serializer() {
-            return new LongSerializer();
-        }
+    static private class WrapperSerde<T> implements Serde<T> {
+        final private Serializer<T> serializer;
+        final private Deserializer<T> deserializer;
 
-        @Override
-        public Deserializer<Long> deserializer() {
-            return new LongDeserializer();
+        public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer)
{
+            this.serializer = serializer;
+            this.deserializer = deserializer;
         }
-    }
 
-    static public final class IntegerSerde implements Serde<Integer> {
         @Override
-        public Serializer<Integer> serializer() {
-            return new IntegerSerializer();
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            serializer.configure(configs, isKey);
+            deserializer.configure(configs, isKey);
         }
 
         @Override
-        public Deserializer<Integer> deserializer() {
-            return new IntegerDeserializer();
+        public void close() {
+            serializer.close();
+            deserializer.close();
         }
-    }
 
-    static public final class DoubleSerde implements Serde<Double> {
         @Override
-        public Serializer<Double> serializer() {
-            return new DoubleSerializer();
+        public Serializer<T> serializer() {
+            return serializer;
         }
 
         @Override
-        public Deserializer<Double> deserializer() {
-            return new DoubleDeserializer();
+        public Deserializer<T> deserializer() {
+            return deserializer;
         }
     }
 
-    static public final class StringSerde implements Serde<String> {
-        @Override
-        public Serializer<String> serializer() {
-            return new StringSerializer();
-        }
-
-        @Override
-        public Deserializer<String> deserializer() {
-            return new StringDeserializer();
+    static public final class LongSerde extends WrapperSerde<Long> {
+        public LongSerde() {
+            super(new LongSerializer(), new LongDeserializer());
         }
     }
 
-    static public final class ByteBufferSerde implements Serde<ByteBuffer> {
-        @Override
-        public Serializer<ByteBuffer> serializer() {
-            return new ByteBufferSerializer();
+    static public final class IntegerSerde extends WrapperSerde<Integer> {
+        public IntegerSerde() {
+            super(new IntegerSerializer(), new IntegerDeserializer());
         }
+    }
 
-        @Override
-        public Deserializer<ByteBuffer> deserializer() {
-            return new ByteBufferDeserializer();
+    static public final class DoubleSerde extends WrapperSerde<Double> {
+        public DoubleSerde() {
+            super(new DoubleSerializer(), new DoubleDeserializer());
         }
     }
 
-    static public final class BytesSerde implements Serde<Bytes> {
-        @Override
-        public Serializer<Bytes> serializer() {
-            return new BytesSerializer();
+    static public final class StringSerde extends WrapperSerde<String> {
+        public StringSerde() {
+            super(new StringSerializer(), new StringDeserializer());
         }
+    }
 
-        @Override
-        public Deserializer<Bytes> deserializer() {
-            return new BytesDeserializer();
+    static public final class ByteBufferSerde extends WrapperSerde<ByteBuffer> {
+        public ByteBufferSerde() {
+            super(new ByteBufferSerializer(), new ByteBufferDeserializer());
         }
     }
 
-    static public final class ByteArraySerde implements Serde<byte[]> {
-        @Override
-        public Serializer<byte[]> serializer() {
-            return new ByteArraySerializer();
+    static public final class BytesSerde extends WrapperSerde<Bytes> {
+        public BytesSerde() {
+            super(new BytesSerializer(), new BytesDeserializer());
         }
+    }
 
-        @Override
-        public Deserializer<byte[]> deserializer() {
-            return new ByteArrayDeserializer();
+    static public final class ByteArraySerde extends WrapperSerde<byte[]> {
+        public ByteArraySerde() {
+            super(new ByteArraySerializer(), new ByteArrayDeserializer());
         }
     }
 
@@ -154,17 +145,7 @@ public class Serdes {
             throw new IllegalArgumentException("deserializer must not be null");
         }
 
-        return new Serde<T>() {
-            @Override
-            public Serializer<T> serializer() {
-                return serializer;
-            }
-
-            @Override
-            public Deserializer<T> deserializer() {
-                return deserializer;
-            }
-        };
+        return new WrapperSerde<>(serializer, deserializer);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ac3f/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 99eb58f..fac2914 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -300,11 +300,17 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     public Serde keySerde() {
-        return getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class);
+        Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serde.class);
+        serde.configure(originals(), true);
+
+        return serde;
     }
 
     public Serde valueSerde() {
-        return getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class);
+        Serde<?> serde = getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serde.class);
+        serde.configure(originals(), false);
+
+        return serde;
     }
 
     public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ac3f/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 0dacde7..81b406f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -18,10 +18,12 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import static org.junit.Assert.assertEquals;
@@ -31,13 +33,15 @@ public class StreamsConfigTest {
 
     private Properties props = new Properties();
     private StreamsConfig streamsConfig;
-    private StreamThread streamThreadPlaceHolder;
-
 
     @Before
     public void setUp() {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put("key.deserializer.encoding", "UTF8");
+        props.put("value.deserializer.encoding", "UTF-16");
         streamsConfig = new StreamsConfig(props);
     }
 
@@ -49,8 +53,7 @@ public class StreamsConfigTest {
 
     @Test
     public void testGetConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps =
-            streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-application",
"client");
+        Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null,
"example-application", "client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
 
@@ -62,4 +65,23 @@ public class StreamsConfigTest {
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
     }
+
+    @Test
+    public void defaultSerdeShouldBeConfigured() {
+        Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+        serializerConfigs.put("key.serializer.encoding", "UTF8");
+        serializerConfigs.put("value.serializer.encoding", "UTF-16");
+        Serializer<String> serializer = Serdes.String().serializer();
+
+        String str = "my string for testing";
+        String topic = "my topic";
+
+        serializer.configure(serializerConfigs, true);
+        assertEquals("Should get the original string after serialization and deserialization
with the configured encoding",
+                str, streamsConfig.keySerde().deserializer().deserialize(topic, serializer.serialize(topic,
str)));
+
+        serializer.configure(serializerConfigs, false);
+        assertEquals("Should get the original string after serialization and deserialization
with the configured encoding",
+                str, streamsConfig.valueSerde().deserializer().deserialize(topic, serializer.serialize(topic,
str)));
+    }
 }


Mime
View raw message