kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6923; Refactor Serializer/Deserializer for KIP-336 (#5494)
Date Thu, 20 Sep 2018 22:55:20 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1f7192  KAFKA-6923; Refactor Serializer/Deserializer for KIP-336 (#5494)
f1f7192 is described below

commit f1f719211e5f28fe5163e65dba899b1da796a8e0
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Fri Sep 21 00:55:10 2018 +0200

    KAFKA-6923; Refactor Serializer/Deserializer for KIP-336 (#5494)
    
    This patch implements KIP-336. It adds a default implementation to the Serializer/Deserializer
interface to support the use of headers and it deprecates the ExtendedSerializer and ExtendedDeserializer
interfaces for later removal.
    
    Reviewers: Satish Duggana <sduggana@hortonworks.com>, John Roesler <john@confluent.io>,
Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 10 ++-
 .../kafka/clients/producer/KafkaProducer.java      | 18 +++--
 .../kafka/clients/producer/MockProducer.java       | 11 ++-
 .../kafka/common/serialization/Deserializer.java   | 13 ++++
 .../common/serialization/ExtendedDeserializer.java |  2 +
 .../common/serialization/ExtendedSerializer.java   |  2 +
 .../kafka/common/serialization/Serializer.java     | 14 ++++
 .../kafka/clients/producer/KafkaProducerTest.java  | 19 +++--
 .../kafka/api/PlaintextConsumerTest.scala          | 84 ++++++++++++----------
 .../kstream/internals/ChangedDeserializer.java     | 13 ++--
 .../kstream/internals/ChangedSerializer.java       | 11 ++-
 .../streams/processor/internals/SourceNode.java    | 15 ++--
 12 files changed, 124 insertions(+), 88 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 86ee74e..2a6ac0c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -62,7 +62,6 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.ExtendedDeserializer;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -90,7 +89,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyList;
-import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
 
 /**
  * This class manages the fetching process with the brokers.
@@ -127,8 +125,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
     private final SubscriptionState subscriptions;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
-    private final ExtendedDeserializer<K> keyDeserializer;
-    private final ExtendedDeserializer<V> valueDeserializer;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
     private final IsolationLevel isolationLevel;
     private final Map<Integer, FetchSessionHandler> sessionHandlers;
     private final AtomicReference<RuntimeException> cachedListOffsetsException = new
AtomicReference<>();
@@ -165,8 +163,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
         this.checkCrcs = checkCrcs;
-        this.keyDeserializer = ensureExtended(keyDeserializer);
-        this.valueDeserializer = ensureExtended(valueDeserializer);
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
         this.completedFetches = new ConcurrentLinkedQueue<>();
         this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
         this.retryBackoffMs = retryBackoffMs;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 2a35b30..b3f76ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -73,7 +73,6 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -81,7 +80,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
-import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
 
 /**
  * A Kafka client that publishes records to the Kafka cluster.
@@ -250,8 +248,8 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
     private final CompressionType compressionType;
     private final Sensor errors;
     private final Time time;
-    private final ExtendedSerializer<K> keySerializer;
-    private final ExtendedSerializer<V> valueSerializer;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
     private final int requestTimeoutMs;
@@ -361,20 +359,20 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
-                this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                                                                                        
Serializer.class));
+                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                                                                        
Serializer.class);
                 this.keySerializer.configure(config.originals(), true);
             } else {
                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-                this.keySerializer = ensureExtended(keySerializer);
+                this.keySerializer = keySerializer;
             }
             if (valueSerializer == null) {
-                this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                                                                                        
  Serializer.class));
+                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                                                                        
  Serializer.class);
                 this.valueSerializer.configure(config.originals(), false);
             } else {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-                this.valueSerializer = ensureExtended(valueSerializer);
+                this.valueSerializer = valueSerializer;
             }
 
             // load interceptors and make sure they get clientId
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index ab036a0..a38bd04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.util.ArrayDeque;
@@ -41,8 +40,6 @@ import java.util.Objects;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
-
 /**
  * A mock of the producer interface you can use for testing code that uses Kafka.
  * <p>
@@ -59,8 +56,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
     private final Map<TopicPartition, Long> offsets;
     private final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>>
consumerGroupOffsets;
     private Map<String, Map<TopicPartition, OffsetAndMetadata>> uncommittedConsumerGroupOffsets;
-    private final ExtendedSerializer<K> keySerializer;
-    private final ExtendedSerializer<V> valueSerializer;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
     private boolean autoComplete;
     private boolean closed;
     private boolean transactionInitialized;
@@ -93,8 +90,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.cluster = cluster;
         this.autoComplete = autoComplete;
         this.partitioner = partitioner;
-        this.keySerializer = ensureExtended(keySerializer);
-        this.valueSerializer = ensureExtended(valueSerializer);
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
         this.offsets = new HashMap<>();
         this.sent = new ArrayList<>();
         this.uncommittedSends = new ArrayList<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index f9eb398..bc1a714 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.header.Headers;
+
 import java.io.Closeable;
 import java.util.Map;
 
@@ -45,6 +47,17 @@ public interface Deserializer<T> extends Closeable {
      */
     T deserialize(String topic, byte[] data);
 
+    /**
+     * Deserialize a record value from a byte array into a value or object.
+     * @param topic topic associated with the data
+     * @param headers headers associated with the record; may be empty.
+     * @param data serialized bytes; may be null; implementations are recommended to handle
null by returning a value or null rather than throwing an exception.
+     * @return deserialized typed data; may be null
+     */
+    default T deserialize(String topic, Headers headers, byte[] data) {
+        return deserialize(topic, data);
+    }
+
     @Override
     void close();
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
index 2d5be4a..2f4a012 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
@@ -29,7 +29,9 @@ import org.apache.kafka.common.header.Headers;
  *
  * A class that implements this interface is expected to have a constructor with no parameters.
  * @param <T>
+ * @deprecated This class has been deprecated and will be removed in a future release. Please
use {@link Deserializer} instead.
  */
+@Deprecated
 public interface ExtendedDeserializer<T> extends Deserializer<T> {
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
index 14fbb47..8c94980 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
@@ -29,7 +29,9 @@ import org.apache.kafka.common.header.Headers;
  *
  * A class that implements this interface is expected to have a constructor with no parameters.
  * @param <T>
+ * @deprecated This class has been deprecated and will be removed in a future release. Please
use {@link Serializer} instead.
  */
+@Deprecated
 public interface ExtendedSerializer<T> extends Serializer<T> {
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 96fe86b..c5d4760 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.header.Headers;
+
 import java.io.Closeable;
 import java.util.Map;
 
@@ -47,6 +49,18 @@ public interface Serializer<T> extends Closeable {
     byte[] serialize(String topic, T data);
 
     /**
+     * Convert {@code data} into a byte array.
+     *
+     * @param topic topic associated with data
+     * @param headers headers associated with the record
+     * @param data typed data
+     * @return serialized bytes
+     */
+    default byte[] serialize(String topic, Headers headers, T data) {
+        return serialize(topic, data);
+    }
+
+    /**
      * Close this serializer.
      *
      * This method must be idempotent as it may be called multiple times.
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 22fa0a1..676aafd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.ExtendedSerializer;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -416,15 +417,25 @@ public class KafkaProducerTest {
         Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
     }
 
+    @SuppressWarnings("unchecked") // safe as generic parameters won't vary
+    @PrepareOnlyThisForTest(Metadata.class)
+    @Test
+    public void testHeadersWithExtendedClasses() throws Exception {
+        doTestHeaders(ExtendedSerializer.class);
+    }
+
+    @SuppressWarnings("unchecked")
     @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testHeaders() throws Exception {
+        doTestHeaders(Serializer.class);
+    }
+
+    private <T extends Serializer<String>> void doTestHeaders(Class<T>
serializerClassToMock) throws Exception {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
-        ExtendedSerializer<String> keySerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
-        @SuppressWarnings("unchecked")
-        ExtendedSerializer<String> valueSerializer = PowerMock.createNiceMock(ExtendedSerializer.class);
+        T keySerializer = PowerMock.createNiceMock(serializerClassToMock);
+        T valueSerializer = PowerMock.createNiceMock(serializerClassToMock);
 
         KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer,
valueSerializer);
         Metadata metadata = PowerMock.createNiceMock(Metadata.class);
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 1031483..fc5a24f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -66,61 +66,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
-  @Test
-  def testHeadersExtendedSerializerDeserializer() {
-    val numRecords = 1
-    val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
-
-    val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
-
-      var serializer = new ByteArraySerializer()
-
-      override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte]
= {
-        headers.add("content-type", "application/octet-stream".getBytes)
-        serializer.serialize(topic, data)
-      }
+  trait SerializerImpl {
+    var serializer = new ByteArraySerializer()
 
-      override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs,
isKey)
-
-      override def close(): Unit = serializer.close()
-
-      override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
-        fail("method should not be invoked")
-        null
-      }
+    def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
+      headers.add("content-type", "application/octet-stream".getBytes)
+      serializer.serialize(topic, data)
     }
 
+    def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs,
isKey)
 
-    val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] {
-
-      var deserializer = new ByteArrayDeserializer()
+    def close(): Unit = serializer.close()
 
-      override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte]
= {
-        val header = headers.lastHeader("content-type")
-        assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
-        deserializer.deserialize(topic, data)
-      }
+    def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
+      fail("method should not be invoked")
+      null
+    }
+  }
 
-      override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = deserializer.configure(configs,
isKey)
+  trait DeserializerImpl {
+    var deserializer = new ByteArrayDeserializer()
 
+    def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
+      val header = headers.lastHeader("content-type")
+      assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
+      deserializer.deserialize(topic, data)
+    }
 
-      override def close(): Unit = deserializer.close()
+    def configure(configs: util.Map[String, _], isKey: Boolean): Unit = deserializer.configure(configs,
isKey)
 
-      override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
-        fail("method should not be invoked")
-        null
-      }
+    def close(): Unit = deserializer.close()
 
+    def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
+      fail("method should not be invoked")
+      null
     }
+  }
+
+  private def testHeadersSerializeDeserialize(serializer: Serializer[Array[Byte]], deserializer:
Deserializer[Array[Byte]]): Unit = {
+    val numRecords = 1
+    val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
 
     val producer = createProducer(
       keySerializer = new ByteArraySerializer,
-      valueSerializer = extendedSerializer)
+      valueSerializer = serializer)
     producer.send(record)
 
     val consumer = createConsumer(
       keyDeserializer = new ByteArrayDeserializer,
-      valueDeserializer = extendedDeserializer)
+      valueDeserializer = deserializer)
     assertEquals(0, consumer.assignment.size)
     consumer.assign(List(tp).asJava)
     assertEquals(1, consumer.assignment.size)
@@ -132,6 +126,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testHeadersExtendedSerializerDeserializer(): Unit = {
+    val extendedSerializer = new ExtendedSerializer[Array[Byte]] with SerializerImpl
+    val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] with DeserializerImpl
+
+    testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
+  }
+
+  @Test
+  def testHeadersSerializerDeserializer(): Unit = {
+    val extendedSerializer = new Serializer[Array[Byte]] with SerializerImpl
+    val extendedDeserializer = new Deserializer[Array[Byte]] with DeserializerImpl
+
+    testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
+  }
+
+  @Test
   def testMaxPollRecords() {
     val maxPollRecords = 2
     val numRecords = 10000
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 29180f2..56193d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -18,29 +18,26 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.ExtendedDeserializer;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
-
-public class ChangedDeserializer<T> implements ExtendedDeserializer<Change<T>>
{
+public class ChangedDeserializer<T> implements Deserializer<Change<T>>
{
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private ExtendedDeserializer<T> inner;
+    private Deserializer<T> inner;
 
     public ChangedDeserializer(final Deserializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
-    public ExtendedDeserializer<T> inner() {
+    public Deserializer<T> inner() {
         return inner;
     }
 
     public void setInner(final Deserializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 83ac8e0..8a76bad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -17,23 +17,20 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
-
-public class ChangedSerializer<T> implements ExtendedSerializer<Change<T>>
{
+public class ChangedSerializer<T> implements Serializer<Change<T>> {
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private ExtendedSerializer<T> inner;
+    private Serializer<T> inner;
 
     public ChangedSerializer(final Serializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
     public Serializer<T> inner() {
@@ -41,7 +38,7 @@ public class ChangedSerializer<T> implements ExtendedSerializer<Change<T>>
{
     }
 
     public void setInner(final Serializer<T> inner) {
-        this.inner = ensureExtended(inner);
+        this.inner = inner;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 4aaba9e..d7627fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -18,22 +18,19 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.ExtendedDeserializer;
 import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.List;
 
-import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
-
 public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
     private final List<String> topics;
 
     private ProcessorContext context;
-    private ExtendedDeserializer<K> keyDeserializer;
-    private ExtendedDeserializer<V> valDeserializer;
+    private Deserializer<K> keyDeserializer;
+    private Deserializer<V> valDeserializer;
     private final TimestampExtractor timestampExtractor;
 
     public SourceNode(final String name,
@@ -44,8 +41,8 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         super(name);
         this.topics = topics;
         this.timestampExtractor = timestampExtractor;
-        this.keyDeserializer = ensureExtended(keyDeserializer);
-        this.valDeserializer = ensureExtended(valDeserializer);
+        this.keyDeserializer = keyDeserializer;
+        this.valDeserializer = valDeserializer;
     }
 
     public SourceNode(final String name,
@@ -71,9 +68,9 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
         // if deserializers are null, get the default ones from the context
         if (this.keyDeserializer == null)
-            this.keyDeserializer = ensureExtended((Deserializer<K>) context.keySerde().deserializer());
+            this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
         if (this.valDeserializer == null)
-            this.valDeserializer = ensureExtended((Deserializer<V>) context.valueSerde().deserializer());
+            this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
 
         // if value deserializers are for {@code Change} values, set the inner deserializer
when necessary
         if (this.valDeserializer instanceof ChangedDeserializer &&


Mime
View raw message