kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7386: streams-scala should not cache serdes (#5622)
Date Tue, 11 Sep 2018 23:17:53 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dac615  KAFKA-7386: streams-scala should not cache serdes (#5622)
9dac615 is described below

commit 9dac615d228c5b3464c6322aea9f9ce70f9ef37b
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Tue Sep 11 18:17:47 2018 -0500

    KAFKA-7386: streams-scala should not cache serdes (#5622)
    
    Currently, scala.Serdes.String, for example, invokes Serdes.String() once and caches the
result.
    
    However, the implementation of the String serde has a non-empty configure method that
is variant in whether it's used as a key or value serde. So we won't get correct execution
if we create one serde and use it for both keys and values.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/scala/Serdes.scala    | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
index 8bfb083..02e5380 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -25,17 +25,17 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer,
S
 import org.apache.kafka.streams.kstream.WindowedSerdes
 
 object Serdes {
-  implicit val String: Serde[String] = JSerdes.String()
-  implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
-  implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
-  implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
-  implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
-  implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
-  implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
-  implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
-  implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
-  implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
-  implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def String: Serde[String] = JSerdes.String()
+  implicit def Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long()
+  implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
+  implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
+  implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
+  implicit def Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
+  implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
 
   implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
   implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] =


Mime
View raw message