kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6161 Add default implementation to close() and configure() for Serdes (#5348)
Date Thu, 21 Feb 2019 17:05:24 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35a0de3  KAFKA-6161 Add default implementation to close() and configure() for Serdes
(#5348)
35a0de3 is described below

commit 35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0
Author: Chia-Ping Tsai <chia7712@gmail.com>
AuthorDate: Fri Feb 22 01:05:13 2019 +0800

    KAFKA-6161 Add default implementation to close() and configure() for Serdes (#5348)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/common/serialization/ByteArrayDeserializer.java  | 12 ------------
 .../kafka/common/serialization/ByteArraySerializer.java    | 13 -------------
 .../kafka/common/serialization/ByteBufferDeserializer.java | 10 ----------
 .../kafka/common/serialization/ByteBufferSerializer.java   | 10 ----------
 .../kafka/common/serialization/BytesDeserializer.java      | 11 -----------
 .../apache/kafka/common/serialization/BytesSerializer.java | 11 -----------
 .../apache/kafka/common/serialization/Deserializer.java    | 13 +++++++++++--
 .../kafka/common/serialization/DoubleDeserializer.java     | 12 ------------
 .../kafka/common/serialization/DoubleSerializer.java       | 13 -------------
 .../kafka/common/serialization/FloatDeserializer.java      | 14 --------------
 .../apache/kafka/common/serialization/FloatSerializer.java | 13 -------------
 .../kafka/common/serialization/IntegerDeserializer.java    | 11 -----------
 .../kafka/common/serialization/IntegerSerializer.java      | 11 -----------
 .../kafka/common/serialization/LongDeserializer.java       | 11 -----------
 .../apache/kafka/common/serialization/LongSerializer.java  | 11 -----------
 .../java/org/apache/kafka/common/serialization/Serde.java  |  9 +++++++--
 .../org/apache/kafka/common/serialization/Serializer.java  | 10 +++++++---
 .../kafka/common/serialization/ShortDeserializer.java      | 10 ----------
 .../apache/kafka/common/serialization/ShortSerializer.java | 11 -----------
 .../kafka/common/serialization/StringDeserializer.java     |  5 -----
 .../kafka/common/serialization/StringSerializer.java       |  5 -----
 .../kafka/common/serialization/UUIDDeserializer.java       |  5 -----
 .../apache/kafka/common/serialization/UUIDSerializer.java  |  5 -----
 .../test/java/org/apache/kafka/test/MockSerializer.java    |  5 -----
 .../org/apache/kafka/connect/json/JsonDeserializer.java    | 10 ----------
 .../java/org/apache/kafka/connect/json/JsonSerializer.java | 11 -----------
 .../test/scala/kafka/tools/CustomDeserializerTest.scala    |  5 -----
 docs/streams/upgrade-guide.html                            |  5 +++++
 .../src/main/java/org/apache/kafka/streams/Topology.java   |  4 +++-
 .../streams/kstream/internals/ChangedDeserializer.java     |  6 ------
 .../kafka/streams/kstream/internals/ChangedSerializer.java |  6 ------
 .../java/org/apache/kafka/streams/StreamsConfigTest.java   |  3 ---
 .../java/org/apache/kafka/streams/perf/YahooBenchmark.java | 12 ------------
 .../kafka/streams/processor/internals/SourceNodeTest.java  |  7 -------
 .../streams/state/internals/SerdeThatDoesntHandleNull.java | 12 ------------
 .../org/apache/kafka/streams/TopologyTestDriverTest.java   |  8 --------
 .../org/apache/kafka/tools/ClientCompatibilityTest.java    | 10 ----------
 37 files changed, 33 insertions(+), 307 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
index 2672115..1147f45 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
@@ -16,22 +16,10 @@
  */
 package org.apache.kafka.common.serialization;
 
-import java.util.Map;
-
 public class ByteArrayDeserializer implements Deserializer<byte[]> {
 
     @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
-    @Override
     public byte[] deserialize(String topic, byte[] data) {
         return data;
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
index d069e94..6bebaa6 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
@@ -16,22 +16,9 @@
  */
 package org.apache.kafka.common.serialization;
 
-import java.util.Map;
-
 public class ByteArraySerializer implements Serializer<byte[]> {
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     @Override
     public byte[] serialize(String topic, byte[] data) {
         return data;
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
index d41f03c..0dfcf5f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
@@ -17,22 +17,12 @@
 package org.apache.kafka.common.serialization;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 
 public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public ByteBuffer deserialize(String topic, byte[] data) {
         if (data == null)
             return null;
 
         return ByteBuffer.wrap(data);
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
index c8c3692..9fb1254 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -17,14 +17,8 @@
 package org.apache.kafka.common.serialization;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 
 public class ByteBufferSerializer implements Serializer<ByteBuffer> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public byte[] serialize(String topic, ByteBuffer data) {
         if (data == null)
             return null;
@@ -43,8 +37,4 @@ public class ByteBufferSerializer implements Serializer<ByteBuffer>
{
         data.rewind();
         return ret;
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
index 66b07eb..1350dca 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
@@ -18,22 +18,11 @@ package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.utils.Bytes;
 
-import java.util.Map;
-
 public class BytesDeserializer implements Deserializer<Bytes> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public Bytes deserialize(String topic, byte[] data) {
         if (data == null)
             return null;
 
         return new Bytes(data);
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
index 0dc4476..62ea6ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
@@ -18,23 +18,12 @@ package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.utils.Bytes;
 
-import java.util.Map;
-
 public class BytesSerializer implements Serializer<Bytes> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public byte[] serialize(String topic, Bytes data) {
         if (data == null)
             return null;
 
         return data.get();
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index bc1a714..eb56485 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -37,7 +37,9 @@ public interface Deserializer<T> extends Closeable {
      * @param configs configs in key/value pairs
      * @param isKey whether is for key or value
      */
-    void configure(Map<String, ?> configs, boolean isKey);
+    default void configure(Map<String, ?> configs, boolean isKey) {
+        // intentionally left blank
+    }
 
     /**
      * Deserialize a record value from a byte array into a value or object.
@@ -58,6 +60,13 @@ public interface Deserializer<T> extends Closeable {
         return deserialize(topic, data);
     }
 
+    /**
+     * Close this deserializer.
+     * <p>
+     * This method must be idempotent as it may be called multiple times.
+     */
     @Override
-    void close();
+    default void close() {
+        // intentionally left blank
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
index 24f6007..0fa1cce 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java
@@ -18,16 +18,9 @@ package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.errors.SerializationException;
 
-import java.util.Map;
-
 public class DoubleDeserializer implements Deserializer<Double> {
 
     @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
-    @Override
     public Double deserialize(String topic, byte[] data) {
         if (data == null)
             return null;
@@ -42,9 +35,4 @@ public class DoubleDeserializer implements Deserializer<Double> {
         }
         return Double.longBitsToDouble(value);
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
index 7dd4edc..99781b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java
@@ -16,15 +16,7 @@
  */
 package org.apache.kafka.common.serialization;
 
-import java.util.Map;
-
 public class DoubleSerializer implements Serializer<Double> {
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     @Override
     public byte[] serialize(String topic, Double data) {
         if (data == null)
@@ -42,9 +34,4 @@ public class DoubleSerializer implements Serializer<Double> {
             (byte) bits
         };
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
index 3834ce2..0903177 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
@@ -18,15 +18,7 @@ package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.errors.SerializationException;
 
-import java.util.Map;
-
 public class FloatDeserializer implements Deserializer<Float> {
-
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // nothing to do
-    }
-
     @Override
     public Float deserialize(final String topic, final byte[] data) {
         if (data == null)
@@ -42,10 +34,4 @@ public class FloatDeserializer implements Deserializer<Float> {
         }
         return Float.intBitsToFloat(value);
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
index 6eb766d..aa72d43 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
@@ -16,15 +16,7 @@
  */
 package org.apache.kafka.common.serialization;
 
-import java.util.Map;
-
 public class FloatSerializer implements Serializer<Float> {
-
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // nothing to do
-    }
-
     @Override
     public byte[] serialize(final String topic, final Float data) {
         if (data == null)
@@ -38,9 +30,4 @@ public class FloatSerializer implements Serializer<Float> {
             (byte) bits
         };
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
index 45f8cf1..20ca63f 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java
@@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.errors.SerializationException;
 
-import java.util.Map;
-
 public class IntegerDeserializer implements Deserializer<Integer> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public Integer deserialize(String topic, byte[] data) {
         if (data == null)
             return null;
@@ -40,8 +33,4 @@ public class IntegerDeserializer implements Deserializer<Integer>
{
         }
         return value;
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
index f2144ce..8ab5310 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java
@@ -16,14 +16,7 @@
  */
 package org.apache.kafka.common.serialization;
 
-import java.util.Map;
-
 public class IntegerSerializer implements Serializer<Integer> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public byte[] serialize(String topic, Integer data) {
         if (data == null)
             return null;
@@ -35,8 +28,4 @@ public class IntegerSerializer implements Serializer<Integer> {
             data.byteValue()
         };
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
index a58b1d3..1e445d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java
@@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.errors.SerializationException;
 
-import java.util.Map;
-
 public class LongDeserializer implements Deserializer<Long> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public Long deserialize(String topic, byte[] data) {
         if (data == null)
             return null;
@@ -40,8 +33,4 @@ public class LongDeserializer implements Deserializer<Long> {
         }
         return value;
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
index d37842c..436f0e0 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java
@@ -16,14 +16,7 @@
  */
 package org.apache.kafka.common.serialization;
 
-import java.util.Map;
-
 public class LongSerializer implements Serializer<Long> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public byte[] serialize(String topic, Long data) {
         if (data == null)
             return null;
@@ -39,8 +32,4 @@ public class LongSerializer implements Serializer<Long> {
             data.byteValue()
         };
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
index fbcc7c2..5b052e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java
@@ -34,14 +34,19 @@ public interface Serde<T> extends Closeable {
      * @param configs configs in key/value pairs
      * @param isKey whether is for key or value
      */
-    void configure(Map<String, ?> configs, boolean isKey);
+    default void configure(Map<String, ?> configs, boolean isKey) {
+        // intentionally left blank
+    }
 
     /**
      * Close this serde class, which will close the underlying serializer and deserializer.
+     * <p>
      * This method has to be idempotent because it might be called multiple times.
      */
     @Override
-    void close();
+    default void close() {
+        // intentionally left blank
+    }
 
     Serializer<T> serializer();
 
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index c5d4760..144b5ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -37,7 +37,9 @@ public interface Serializer<T> extends Closeable {
      * @param configs configs in key/value pairs
      * @param isKey whether is for key or value
      */
-    void configure(Map<String, ?> configs, boolean isKey);
+    default void configure(Map<String, ?> configs, boolean isKey) {
+        // intentionally left blank
+    }
 
     /**
      * Convert {@code data} into a byte array.
@@ -62,9 +64,11 @@ public interface Serializer<T> extends Closeable {
 
     /**
      * Close this serializer.
-     *
+     * <p>
      * This method must be idempotent as it may be called multiple times.
      */
     @Override
-    void close();
+    default void close() {
+        // intentionally left blank
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
index 45aa8ae..7814a7b 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
@@ -18,14 +18,8 @@ package org.apache.kafka.common.serialization;
 
 import org.apache.kafka.common.errors.SerializationException;
 
-import java.util.Map;
-
 public class ShortDeserializer implements Deserializer<Short> {
 
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public Short deserialize(String topic, byte[] data) {
         if (data == null)
             return null;
@@ -40,8 +34,4 @@ public class ShortDeserializer implements Deserializer<Short> {
         }
         return value;
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
index a66aaa0..e54354b 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java
@@ -16,14 +16,7 @@
  */
 package org.apache.kafka.common.serialization;
 
-import java.util.Map;
-
 public class ShortSerializer implements Serializer<Short> {
-
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // nothing to do
-    }
-
     public byte[] serialize(String topic, Short data) {
         if (data == null)
             return null;
@@ -33,8 +26,4 @@ public class ShortSerializer implements Serializer<Short> {
             data.byteValue()
         };
     }
-
-    public void close() {
-        // nothing to do
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
index 0398a1b..68e6c40 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
@@ -49,9 +49,4 @@ public class StringDeserializer implements Deserializer<String> {
             throw new SerializationException("Error when deserializing byte[] to string due
to unsupported encoding " + encoding);
         }
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
index 28e4174..e16e19a 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
@@ -49,9 +49,4 @@ public class StringSerializer implements Serializer<String> {
             throw new SerializationException("Error when serializing string to byte[] due
to unsupported encoding " + encoding);
         }
     }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
 }
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
index a6eb2ea..e852fc9 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
@@ -52,9 +52,4 @@ public class UUIDDeserializer implements Deserializer<UUID> {
             throw new SerializationException("Error parsing data into UUID", e);
         }
     }
-
-    @Override
-    public void close() {
-      // do nothing
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
index d8e2524..908c202 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
@@ -50,9 +50,4 @@ public class UUIDSerializer implements Serializer<UUID> {
             throw new SerializationException("Error when serializing UUID to byte[] due to
unsupported encoding " + encoding);
         }
     }
-
-    @Override
-    public void close() {
-      // nothing to do
-    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
index 0c597c8..1c14445 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.ClusterResourceListener;
 import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.serialization.Serializer;
 
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -36,10 +35,6 @@ public class MockSerializer implements ClusterResourceListener, Serializer<byte[
     }
 
     @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-    }
-
-    @Override
     public byte[] serialize(String topic, byte[] data) {
         // This will ensure that we get the cluster metadata when serialize is called for
the first time
         // as subsequent compareAndSet operations will fail.
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
index 8f2171b..b006e22 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java
@@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Deserializer;
 
-import java.util.Map;
-
 /**
  * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to
work with arbitrarily
  * structured data without having associated Java classes. This deserializer also supports
Connect schemas.
@@ -36,9 +34,6 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
     public JsonDeserializer() {
     }
 
-    @Override
-    public void configure(Map<String, ?> props, boolean isKey) {
-    }
 
     @Override
     public JsonNode deserialize(String topic, byte[] bytes) {
@@ -54,9 +49,4 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
 
         return data;
     }
-
-    @Override
-    public void close() {
-
-    }
 }
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
index 438daa1..94ec0a8 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java
@@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Serializer;
 
-import java.util.Map;
-
 /**
  * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows
handling arbitrarily
  * structured data without corresponding Java classes. This serializer also supports Connect
schemas.
@@ -38,10 +36,6 @@ public class JsonSerializer implements Serializer<JsonNode> {
     }
 
     @Override
-    public void configure(Map<String, ?> config, boolean isKey) {
-    }
-
-    @Override
     public byte[] serialize(String topic, JsonNode data) {
         if (data == null)
             return null;
@@ -52,9 +46,4 @@ public class JsonSerializer implements Serializer<JsonNode> {
             throw new SerializationException("Error serializing JSON message", e);
         }
     }
-
-    @Override
-    public void close() {
-    }
-
 }
diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
index f94a900..7fb3cf3 100644
--- a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
+++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
@@ -27,16 +27,11 @@ import org.junit.Test
 import org.scalatest.mockito.MockitoSugar
 
 class CustomDeserializer extends Deserializer[String] {
-  override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {
-  }
 
   override def deserialize(topic: String, data: Array[Byte]): String = {
     assertThat("topic must not be null", topic, CoreMatchers.notNullValue)
     new String(data)
   }
-
-  override def close(): Unit = {
-  }
 }
 
 class CustomDeserializerTest extends MockitoSugar {
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b4d957a..9071dc2 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -73,6 +73,11 @@
         As of 2.3.0 Streams now offers an in-memory version of the window store, in addition
to the persistent one based on RocksDB. The new public interface <code>inMemoryWindowStore()</code>
is added to Stores that provides a built-in in-memory window store.
     </p>
 
+    <p>
+        In 2.3.0 we have added default implementation to close() and configure() for <code>Serializer</code>,
<code>Deserializer</code> and <code>Serde</code> so that they can
be
+        implemented by lambda expression. For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+implementation+to+close%28%29+and+configure%28%29+for+Serializer%2C+Deserializer+and+Serde">KIP-331</a>.
+    </p>
+
     <h3><a id="streams_api_changes_220" href="#streams_api_changes_220">Streams
API changes in 2.2.0</a></h3>
     <p>
         We've simplified the <code>KafkaStreams#state</code> transition diagram
during the starting up phase a bit in 2.2.0: in older versions the state will transit from
<code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code>
to get the first
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 8b2a46b..d13e4a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -304,6 +304,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if processor is already added or if topics have already
been registered by name
      */
+    @SuppressWarnings("overloads")
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final Deserializer keyDeserializer,
@@ -359,7 +360,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if processor is already added or if topics have already
been registered by another source
      */
-
+    @SuppressWarnings("overloads")
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final TimestampExtractor timestampExtractor,
@@ -391,6 +392,7 @@ public class Topology {
      * @return itself
      * @throws TopologyException if processor is already added or if topics have already
been registered by name
      */
+    @SuppressWarnings("overloads")
     public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                            final String name,
                                            final TimestampExtractor timestampExtractor,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 56193d5..36f77b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 
 public class ChangedDeserializer<T> implements Deserializer<Change<T>>
{
 
@@ -41,11 +40,6 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>
{
     }
 
     @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // do nothing
-    }
-
-    @Override
     public Change<T> deserialize(final String topic, final Headers headers, final byte[]
data) {
 
         final byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 7fa34b7..bfd0afa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 
 public class ChangedSerializer<T> implements Serializer<Change<T>> {
 
@@ -41,11 +40,6 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>
{
         this.inner = inner;
     }
 
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // do nothing
-    }
-
     /**
      * @throws StreamsException if both old and new values of data are null, or if
      * both values are not null
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index afebfdb..2c9a97b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -646,9 +646,6 @@ public class StreamsConfigTest {
         }
 
         @Override
-        public void close() {}
-
-        @Override
         public Serializer serializer() {
             return null;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 8607902..2cab626 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -189,9 +189,6 @@ public class YahooBenchmark {
         public JsonPOJOSerializer() {}
 
         @Override
-        public void configure(final Map<String, ?> props, final boolean isKey) {}
-
-        @Override
         public byte[] serialize(final String topic, final T data) {
             if (data == null) {
                 return null;
@@ -203,10 +200,6 @@ public class YahooBenchmark {
                 throw new SerializationException("Error serializing JSON message", e);
             }
         }
-
-        @Override
-        public void close() {}
-
     }
 
     // Note: these are also in the streams example package, eventuall use 1 file
@@ -242,11 +235,6 @@ public class YahooBenchmark {
 
             return data;
         }
-
-        @Override
-        public void close() {
-
-        }
     }
 
     private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final
String campaignsTopic, final String eventsTopic,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 452dd7b..08112a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.test.MockSourceNode;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -52,14 +51,8 @@ public class SourceNodeTest {
         }
 
         @Override
-        public void configure(final Map<String, ?> configs, final boolean isKey) {
}
-
-        @Override
         public String deserialize(final String topic, final byte[] data) {
             return deserialize(topic, null, data);
         }
-
-        @Override
-        public void close() { }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java
index bf1d030..03e0c3a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java
@@ -22,20 +22,8 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
-import java.util.Map;
-
 class SerdeThatDoesntHandleNull implements Serde<String> {
     @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
     public Serializer<String> serializer() {
         return new StringSerializer();
     }
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 9de7798..6f6c51e 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -473,10 +473,6 @@ public class TopologyTestDriverTest {
                     }
                     return Serdes.Integer().serializer().serialize(topic, (Integer) data);
                 }
-                @Override
-                public void close() {}
-                @Override
-                public void configure(final Map configs, final boolean isKey) {}
             },
             new Serializer<Object>() {
                 @Override
@@ -486,10 +482,6 @@ public class TopologyTestDriverTest {
                     }
                     return Serdes.Double().serializer().serialize(topic, (Double) data);
                 }
-                @Override
-                public void close() {}
-                @Override
-                public void configure(final Map configs, final boolean isKey) {}
             },
             processor);
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 5b7e228..887bdc4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -340,21 +340,11 @@ public class ClientCompatibilityTest {
         }
 
         @Override
-        public void configure(Map<String, ?> configs, boolean isKey) {
-            // nothing to do
-        }
-
-        @Override
         public byte[] deserialize(String topic, byte[] data) {
             return data;
         }
 
         @Override
-        public void close() {
-            // nothing to do
-        }
-
-        @Override
         public void onUpdate(ClusterResource clusterResource) {
             if (expectClusterId) {
                 if (clusterResource.clusterId() == null) {


Mime
View raw message