From commits-return-9478-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Tue May 8 16:15:40 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61060188D3 for ; Tue, 8 May 2018 16:15:40 +0000 (UTC) Received: (qmail 88392 invoked by uid 500); 8 May 2018 16:15:40 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 88341 invoked by uid 500); 8 May 2018 16:15:40 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 88331 invoked by uid 99); 8 May 2018 16:15:40 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 May 2018 16:15:40 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8854980810; Tue, 8 May 2018 16:15:39 +0000 (UTC) Date: Tue, 08 May 2018 16:15:39 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: Make Serdes less confusing in Scala (#4963) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152579613881.16427.10664258859147733550@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 2b5a59406622affa1e333e073546f075f59b4ac9 X-Git-Newrev: b88d70b53290af715034a1f772a271f7e44505fd X-Git-Rev: b88d70b53290af715034a1f772a271f7e44505fd X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b88d70b MINOR: Make Serdes less confusing in Scala (#4963) b88d70b is described below commit b88d70b53290af715034a1f772a271f7e44505fd Author: Joan Goyeau AuthorDate: Tue May 8 17:15:31 2018 +0100 MINOR: Make Serdes less confusing in Scala (#4963) Serdes are confusing in the Scala wrapper: * We have wrappers around Serializer, Deserializer and Serde which are not very useful. * We have Serdes in 2 places org.apache.kafka.common.serialization.Serde and in DefaultSerdes, instead we should be having only one place where to find all the Serdes. I wanted to do this PR before the release as this is a breaking change. This shouldn't add more so the current tests should be enough. Reviewers: Debasish Ghosh , Guozhang Wang --- docs/streams/developer-guide/dsl-api.html | 11 ++-- docs/streams/index.html | 2 +- .../apache/kafka/streams/scala/DefaultSerdes.scala | 47 -------------- .../apache/kafka/streams/scala/ScalaSerde.scala | 70 --------------------- .../org/apache/kafka/streams/scala/Serdes.scala | 71 ++++++++++++++++++++++ .../kafka/streams/scala/StreamsBuilder.scala | 4 +- .../kafka/streams/scala/kstream/KStream.scala | 12 ++-- ...bleJoinScalaIntegrationTestImplicitSerdes.scala | 16 ++--- .../apache/kafka/streams/scala/TopologyTest.scala | 14 ++--- .../apache/kafka/streams/scala/WordCountTest.scala | 10 +-- 10 files changed, 105 insertions(+), 152 deletions(-) diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 2b25072..687dff9 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -3165,8 +3165,7 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res

The library also has several utility abstractions and modules that the user needs to use for proper semantics.

  • org.apache.kafka.streams.scala.ImplicitConversions: Module that brings into scope the implicit conversions between the Scala and Java classes.
  • -
  • org.apache.kafka.streams.scala.DefaultSerdes: Module that brings into scope the implicit values of all primitive SerDes.
  • -
  • org.apache.kafka.streams.scala.ScalaSerde: Base abstraction that can be used to implement custom SerDes in a type safe way.
  • +
  • org.apache.kafka.streams.scala.Serdes: Module that contains all primitive SerDes that can be imported as implicits and a helper to create custom SerDes.

The library is cross-built with Scala 2.11 and 2.12. To reference the library compiled against Scala 2.11 include the following in your maven pom.xml add the following:

@@ -3197,7 +3196,7 @@ import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
 object WordCountApplication extends App {
-  import DefaultSerdes._
+  import Serdes._
 
   val config: Properties = {
     val p = new Properties()
@@ -3235,7 +3234,7 @@ object WordCountApplication extends App {
 // that will set up all Serialized, Produced, Consumed and Joined instances.
 // So all APIs below that accept Serialized, Produced, Consumed or Joined will
 // get these instances automatically
-import DefaultSerdes._
+import Serdes._
 
 val builder = new StreamsBuilder()
 
@@ -3260,7 +3259,7 @@ clicksPerRegion.toStream.to(outputTopic)
               

Quite a few things are going on in the above code snippet that may warrant a few lines of elaboration:

  1. The code snippet does not depend on any config defined SerDes. In fact any SerDes defined as part of the config will be ignored.
  2. -
  3. All SerDes are picked up from the implicits in scope. And import DefaultSerdes._ brings all necessary SerDes in scope.
  4. +
  5. All SerDes are picked up from the implicits in scope. And import Serdes._ brings all necessary SerDes in scope.
  6. This is an example of compile time type safety that we don't have in the Java APIs.
  7. The code looks less verbose and more focused towards the actual transformation that it does on the data stream.
@@ -3277,7 +3276,7 @@ case class UserClicks(clicks: Long) implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde // Primitive SerDes -import DefaultSerdes._ +import Serdes._ // And then business as usual .. diff --git a/docs/streams/index.html b/docs/streams/index.html index 72e1323..6dfaf6b 100644 --- a/docs/streams/index.html +++ b/docs/streams/index.html @@ -261,7 +261,7 @@ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} object WordCountApplication extends App { - import DefaultSerdes._ + import Serdes._ val config: Properties = { val p = new Properties() diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala deleted file mode 100644 index 3f2840e..0000000 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - * Copyright (C) 2017-2018 Alexis Seigneurin. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.scala - -import java.nio.ByteBuffer - -import org.apache.kafka.common.serialization.{Serde, Serdes} -import org.apache.kafka.common.utils.Bytes -import org.apache.kafka.streams.kstream.WindowedSerdes - - -/** - * Implicit values for default serdes. - *

- * Bring them in scope for default serializers / de-serializers to work. - */ -object DefaultSerdes { - implicit val stringSerde: Serde[String] = Serdes.String() - implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]] - implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray() - implicit val bytesSerde: Serde[Bytes] = Serdes.Bytes() - implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]] - implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]] - implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]] - implicit val shortSerde: Serde[Short] = Serdes.Short().asInstanceOf[Serde[Short]] - implicit val byteBufferSerde: Serde[ByteBuffer] = Serdes.ByteBuffer() - - implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]() - implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]() -} diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala deleted file mode 100644 index 06afcae..0000000 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - * Copyright (C) 2017-2018 Alexis Seigneurin. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.scala - -import org.apache.kafka.common.serialization.{Serde, Deserializer => JDeserializer, Serializer => JSerializer} - -trait ScalaSerde[T] extends Serde[T] { - override def deserializer(): JDeserializer[T] - - override def serializer(): JSerializer[T] - - override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = () - - override def close(): Unit = () -} - -trait SimpleScalaSerde[T >: Null] extends Serde[T] with ScalaSerde[T] { - def serialize(data: T): Array[Byte] - def deserialize(data: Array[Byte]): Option[T] - - private def outerSerialize(data: T): Array[Byte] = serialize(data) - private def outerDeserialize(data: Array[Byte]): Option[T] = deserialize(data) - - override def deserializer(): Deserializer[T] = new Deserializer[T] { - override def deserialize(data: Array[Byte]): Option[T] = outerDeserialize(data) - } - - override def serializer(): Serializer[T] = new Serializer[T] { - override def serialize(data: T): Array[Byte] = outerSerialize(data) - } -} - -trait Deserializer[T >: Null] extends JDeserializer[T] { - override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = () - - override def close(): Unit = () - - override def deserialize(topic: String, data: Array[Byte]): T = - Option(data).flatMap(deserialize).orNull - - def deserialize(data: Array[Byte]): Option[T] -} - -trait Serializer[T] extends JSerializer[T] { - override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = () - - override def close(): Unit = () - - override def serialize(topic: String, data: T): Array[Byte] = - Option(data).map(serialize).orNull - - def serialize(data: T): Array[Byte] -} diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala new file mode 100644 index 0000000..a0ffffa --- /dev/null +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + * Copyright (C) 2017-2018 Alexis Seigneurin. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import java.util + +import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, Serdes => JSerdes} +import org.apache.kafka.streams.kstream.WindowedSerdes + +object Serdes { + implicit val String: Serde[String] = JSerdes.String() + implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]] + implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long() + implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray() + implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes() + implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]] + implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float() + implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]] + implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double() + implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] + implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() + + implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]() + implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]() + + def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] = + JSerdes.serdeFrom( + new Serializer[T] { + override def serialize(topic: String, data: T): Array[Byte] = serializer(data) + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + override def close(): Unit = () + }, + new Deserializer[T] { + override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + override def close(): Unit = () + } + ) + + def fromFn[T >: Null](serializer: (String, T) => Array[Byte], + deserializer: (String, Array[Byte]) => Option[T]): Serde[T] = + JSerdes.serdeFrom( + new Serializer[T] { + override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data) + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + override def close(): Unit = () + }, + new Deserializer[T] { + override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + override def close(): Unit = () + } + ) +} diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala index 9e6e204..397af32 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala @@ -48,7 +48,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { * import ImplicitConversions._ * * // Bring implicit default serdes in scope - * import DefaultSerdes._ + * import Serdes._ * * val builder = new StreamsBuilder() * @@ -98,7 +98,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { * import ImplicitConversions._ * * // Bring implicit default serdes in scope - * import DefaultSerdes._ + * import Serdes._ * * val builder = new StreamsBuilder() * diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 7634b95..e3e8470 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -206,7 +206,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Example: * * // brings implicit serdes in scope - * import DefaultSerdes._ + * import Serdes._ * * //.. * val clicksPerRegion: KTable[String, Long] = //.. @@ -238,7 +238,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Example: * * // brings implicit serdes in scope - * import DefaultSerdes._ + * import Serdes._ * * //.. * val clicksPerRegion: KTable[String, Long] = //.. @@ -354,7 +354,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Example: * * // brings implicit serdes in scope - * import DefaultSerdes._ + * import Serdes._ * * val clicksPerRegion: KTable[String, Long] = * userClicksStream @@ -362,7 +362,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * .map((_, regionWithClicks) => regionWithClicks) * * // the groupByKey gets the Serialized instance through an implicit conversion of the - * // serdes brought into scope through the import DefaultSerdes._ above + * // serdes brought into scope through the import Serdes._ above * .groupByKey * .reduce(_ + _) * @@ -388,7 +388,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * Example: * * // brings implicit serdes in scope - * import DefaultSerdes._ + * import Serdes._ * * val textLines = streamBuilder.stream[String, String](inputTopic) * @@ -398,7 +398,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * textLines.flatMapValues(v => pattern.split(v.toLowerCase)) * * // the groupBy gets the Serialized instance through an implicit conversion of the - * // serdes brought into scope through the import DefaultSerdes._ above + * // serdes brought into scope through the import Serdes._ above * .groupBy((k, v) => v) * * .count() diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index e701431..113458e 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -73,7 +73,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will // get these instances automatically - import DefaultSerdes._ + import Serdes._ val streamsConfiguration: Properties = getStreamsConfiguration() @@ -122,16 +122,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite val streamsConfiguration: Properties = getStreamsConfiguration() - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) val builder: StreamsBuilderJ = new StreamsBuilderJ() val userClicksStream: KStreamJ[String, JLong] = - builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String(), Serdes.Long())) + builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String, Serdes.JavaLong)) val userRegionsTable: KTableJ[String, String] = - builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String(), Serdes.String())) + builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String, Serdes.String)) // Join the stream against the table. val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream @@ -140,7 +140,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite def apply(clicks: JLong, region: String): (String, JLong) = (if (region == null) "UNKNOWN" else region, clicks) }, - Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String())) + Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)) // Change the stream from -> to -> val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion @@ -152,7 +152,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion - .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long())) + .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong)) .reduce { new Reducer[JLong] { def apply(v1: JLong, v2: JLong) = v1 + v2 @@ -160,7 +160,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite } // Write the (continuously updating) results to the output topic. - clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long())) + clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong)) val streams: KafkaStreamsJ = new KafkaStreamsJ(builder.build(), streamsConfiguration) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 71d4834..9495ea7 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -52,7 +52,7 @@ class TopologyTest extends JUnitSuite { // build the Scala topology def getTopologyScala(): TopologyDescription = { - import DefaultSerdes._ + import Serdes._ import collection.JavaConverters._ val streamBuilder = new StreamsBuilder @@ -87,7 +87,7 @@ class TopologyTest extends JUnitSuite { // build the Scala topology def getTopologyScala(): TopologyDescription = { - import DefaultSerdes._ + import Serdes._ import collection.JavaConverters._ val streamBuilder = new StreamsBuilder @@ -132,7 +132,7 @@ class TopologyTest extends JUnitSuite { // build the Scala topology def getTopologyScala(): TopologyDescription = { - import DefaultSerdes._ + import Serdes._ val builder = new StreamsBuilder() @@ -158,10 +158,10 @@ class TopologyTest extends JUnitSuite { val builder: StreamsBuilderJ = new StreamsBuilderJ() val userClicksStream: KStreamJ[String, JLong] = - builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String(), Serdes.Long())) + builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String, Serdes.JavaLong)) val userRegionsTable: KTableJ[String, String] = - builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String(), Serdes.String())) + builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String, Serdes.String)) // Join the stream against the table. val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream @@ -170,7 +170,7 @@ class TopologyTest extends JUnitSuite { def apply(clicks: JLong, region: String): (String, JLong) = (if (region == null) "UNKNOWN" else region, clicks) }, - Joined.`with`[String, JLong, String](Serdes.String(), Serdes.Long(), Serdes.String())) + Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)) // Change the stream from -> to -> val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion @@ -182,7 +182,7 @@ class TopologyTest extends JUnitSuite { // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion - .groupByKey(Serialized.`with`(Serdes.String(), Serdes.Long())) + .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong)) .reduce { new Reducer[JLong] { def apply(v1: JLong, v2: JLong) = v1 + v2 diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala index e827a3c..17fa35c 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala @@ -75,7 +75,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData { @Test def testShouldCountWords(): Unit = { - import DefaultSerdes._ + import Serdes._ val streamsConfiguration = getStreamsConfiguration() @@ -112,8 +112,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData { import collection.JavaConverters._ val streamsConfiguration = getStreamsConfiguration() - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) val streamBuilder = new StreamsBuilderJ val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ) @@ -134,7 +134,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData { val wordCounts: KTableJ[String, java.lang.Long] = grouped.count() - wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String(), Serdes.Long())) + wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong)) val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration) streams.start() @@ -153,7 +153,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000") streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()) + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath) streamsConfiguration } -- To stop receiving notification emails like this one, please contact guozhang@apache.org.