kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix type inference on joins and aggregates (#5019)
Date Sun, 20 May 2018 23:25:20 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96cda0e  MINOR: Fix type inference on joins and aggregates (#5019)
96cda0e is described below

commit 96cda0e07ac4981a642c6b32fa543bcce78be769
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Mon May 21 00:25:16 2018 +0100

    MINOR: Fix type inference on joins and aggregates (#5019)
    
    The type inference doesn't currently work for the join functions in Scala as it doesn't
know yet the types of the given KStream[K, V] or KTable[K, V].
    
    The fix here is to curry the joiner function. I personally prefer this notation but this
also means it differs more from the Java API.
    I believe the diff with the Java API is worth in this case as it's not only solving the
type inference but also fits better the Scala way of coding (ex: fold).
    
    Moreover any Scala dev will bug and spend little time on these functions trying to understand
why the type inference is not working and then get frustrated to be obliged to be explicit
here where it's not harmful to be inferred.
    
    Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>,
Ismael Juma <ismael@juma.me.uk>
---
 .../streams/scala/kstream/KGroupedStream.scala     | 15 ++--
 .../streams/scala/kstream/KGroupedTable.scala      | 15 ++--
 .../kafka/streams/scala/kstream/KStream.scala      | 94 ++++++++++------------
 .../kafka/streams/scala/kstream/KTable.scala       | 36 ++++-----
 .../scala/kstream/SessionWindowedKStream.scala     | 14 ++--
 .../scala/kstream/TimeWindowedKStream.scala        | 12 +--
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  4 +-
 .../apache/kafka/streams/scala/TopologyTest.scala  |  2 +-
 8 files changed, 87 insertions(+), 105 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index acffb1f..2e85bce 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -84,8 +84,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
    */ 
-  def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = {
+  def reduce(reducer: (V, V) => V, materialized: Materialized[K, V, ByteArrayKeyValueStore]):
KTable[K, V] = {
 
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take
place
     // works perfectly with Scala 2.12 though
@@ -101,9 +100,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */ 
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[K, VR]
=
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -115,10 +113,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */ 
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
+                                        materialized: Materialized[K, VR, ByteArrayKeyValueStore]):
KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
 
   /**
    * Create a new [[SessionWindowedKStream]] instance that can be used to perform session
windowed aggregations.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 673ab5d..87a11c5 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -108,10 +108,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-                    adder: (K, V, VR) => VR,
-                    subtractor: (K, V, VR) => VR): KTable[K, VR] =
-    inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator)
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V,
VR) => VR): KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator)
 
   /**
    * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
@@ -125,9 +123,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-                    adder: (K, V, VR) => VR,
-                    subtractor: (K, V, VR) => VR,
-                    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K,
VR] =
-    inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator,
materialized)
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR,
+                                        subtractor: (K, V, VR) => VR,
+                                        materialized: Materialized[K, VR, ByteArrayKeyValueStore]):
KTable[K, VR] =
+    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator,
materialized)
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 4b0dc2b..49d9fe4 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -47,9 +47,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains only those records that satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filter`
    */ 
-  def filter(predicate: (K, V) => Boolean): KStream[K, V] = {
+  def filter(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filter(predicate.asPredicate)
-  }
 
   /**
    * Create a new [[KStream]] that consists all records of this stream which do <em>not</em>
satisfy the given
@@ -59,9 +58,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains only those records that do <em>not</em>
satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filterNot`
    */ 
-  def filterNot(predicate: (K, V) => Boolean): KStream[K, V] = {
+  def filterNot(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filterNot(predicate.asPredicate)
-  }
 
   /**
    * Set a new key (with possibly new type) for each input record.
@@ -73,9 +71,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with new key (possibly of different type)
and unmodified value
    * @see `org.apache.kafka.streams.kstream.KStream#selectKey`
    */ 
-  def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] = {
+  def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] =
     inner.selectKey[KR](mapper.asKeyValueMapper)
-  }
 
   /**
    * Transform each record of the input stream into a new record in the output stream (both
key and value type can be
@@ -101,9 +98,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly
of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
    */ 
-  def mapValues[VR](mapper: V => VR): KStream[K, VR] = {
+  def mapValues[VR](mapper: V => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper)
-  }
 
   /**
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
@@ -114,9 +110,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly
of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
    */ 
-  def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] = {
+  def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Transform each record of the input stream into zero or more records in the output stream
(both key and value type
@@ -145,9 +140,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with unmodified keys and new
values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
    */ 
-  def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] = {
+  def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapper)
-  }
 
   /**
    * Create a new [[KStream]] by transforming the value of each record in this stream into
zero or more values
@@ -161,9 +155,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with unmodified keys and new
values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
    */ 
-  def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] = {
+  def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapperWithKey)
-  }
 
   /**
    * Print the records of this KStream using the options provided by `Printed`
@@ -179,9 +172,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#foreach`
    */
-  def foreach(action: (K, V) => Unit): Unit = {
+  def foreach(action: (K, V) => Unit): Unit =
     inner.foreach((k: K, v: V) => action(k, v))
-  }
 
   /**
    * Creates an array of {@code KStream} from this stream by branching the records in the
original stream based on
@@ -191,9 +183,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return multiple distinct substreams of this [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#branch`
    */
-  def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = {
+  def branch(predicates: (K, V) => Boolean*): Array[KStream[K, V]] =
     inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
-  }
 
   /**
    * Materialize this stream to a topic and creates a new [[KStream]] from the topic using
the `Produced` instance for 
@@ -304,9 +295,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */ 
   def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
-                          stateStoreNames: String*): KStream[K, VR] = {
+                          stateStoreNames: String*): KStream[K, VR] =
     inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
-  }
 
   /**
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
@@ -335,9 +325,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param stateStoreNames   the names of the state store used by the processor
    * @see `org.apache.kafka.streams.kstream.KStream#process`
    */ 
-  def process(processorSupplier: () => Processor[K, V],
-    stateStoreNames: String*): Unit = {
-
+  def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit
= {
     val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] {
       override def get(): Processor[K, V] = processorSupplier()
     }
@@ -425,11 +413,12 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values computed by
the given `joiner`,
    * one for each matched record-pair with the same key and within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#join`
-   */ 
-  def join[VO, VR](otherStream: KStream[K, VO],
+   */
+  def join[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
    * Join records of this stream with another [[KTable]]'s records using inner equi join
with 
@@ -444,10 +433,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values computed by
the given `joiner`,
    * one for each matched record-pair with the same key 
    * @see `org.apache.kafka.streams.kstream.KStream#join`
-   */ 
-  def join[VT, VR](table: KTable[K, VT],
-    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
-      inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
+   */
+  def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K,
V, VT]): KStream[K, VR] =
+    inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
 
   /**
    * Join records of this stream with `GlobalKTable`'s records using non-windowed inner equi
join.
@@ -460,14 +448,15 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#join`
    */ 
-  def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+  def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
-    joiner: (V, GV) => RV): KStream[K, RV] =
-      inner.join[GK, GV, RV](
-        globalKTable,
-        ((k: K, v: V) => keyValueMapper(k, v)).asKeyValueMapper,
-        ((v: V, gv: GV) => joiner(v, gv)).asValueJoiner
-      )
+    joiner: (V, GV) => RV
+  ): KStream[K, RV] =
+    inner.join[GK, GV, RV](
+      globalKTable,
+      ((k: K, v: V) => keyValueMapper(k, v)).asKeyValueMapper,
+      ((v: V, gv: GV) => joiner(v, gv)).asValueJoiner
+    )
 
   /**
    * Join records of this stream with another [[KStream]]'s records using windowed left equi
join with 
@@ -484,10 +473,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                    one for each matched record-pair with the same key and within the
joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[VO, VR](otherStream: KStream[K, VO],
+  def leftJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
    * Join records of this stream with another [[KTable]]'s records using left equi join with

@@ -503,9 +493,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                 one for each matched record-pair with the same key 
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[VT, VR](table: KTable[K, VT],
-    joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
-      inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
+  def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K,
V, VT]): KStream[K, VR] =
+    inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
 
   /**
    * Join records of this stream with `GlobalKTable`'s records using non-windowed left equi
join.
@@ -518,12 +507,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
    */ 
-  def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
+  def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
-    joiner: (V, GV) => RV): KStream[K, RV] = {
-
+    joiner: (V, GV) => RV
+  ): KStream[K, RV] =
     inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper, joiner.asValueJoiner)
-  }
 
   /**
    * Join records of this stream with another [[KStream]]'s records using windowed outer
equi join with 
@@ -540,10 +528,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * one for each matched record-pair with the same key and within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#outerJoin`
    */ 
-  def outerJoin[VO, VR](otherStream: KStream[K, VO],
+  def outerJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
-    windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
-      inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
+    windows: JoinWindows
+  )(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
+    inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
    * Merge this stream and the given stream into one larger stream.
@@ -567,7 +556,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param action an action to perform on each record
    * @see `org.apache.kafka.streams.kstream.KStream#peek`
    */
-  def peek(action: (K, V) => Unit): KStream[K, V] = {
+  def peek(action: (K, V) => Unit): KStream[K, V] =
     inner.peek((k: K, v: V) => action(k, v))
-  }
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 65cf895..cff1844 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -225,7 +225,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param serialized    the `Serialized` instance used to specify `Serdes`
    * @return a [[KGroupedTable]] that contains the re-grouped records of the original [[KTable]]
    * @see `org.apache.kafka.streams.kstream.KTable#groupBy`
-   */ 
+   */
   def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Serialized[KR,
VR]): KGroupedTable[KR, VR] =
     inner.groupBy(selector.asKeyValueMapper, serialized)
 
@@ -237,9 +237,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by
the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
-   */ 
-  def join[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -252,10 +251,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by
the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
-   */ 
-  def join[VO, VR](other: KTable[K, VO],
+   */
+  def join[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
@@ -266,9 +266,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by
the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def leftJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def leftJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -281,10 +280,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by
the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def leftJoin[VO, VR](other: KTable[K, VO],
+   */
+  def leftJoin[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
@@ -295,9 +295,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by
the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
-  def outerJoin[VO, VR](other: KTable[K, VO],
-    joiner: (V, VO) => VR): KTable[K, VR] =
+   */
+  def outerJoin[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner)
 
   /**
@@ -311,9 +310,10 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */ 
-  def outerJoin[VO, VR](other: KTable[K, VO],
+  def outerJoin[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 1e25554..fd2a565 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -45,10 +45,9 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K,
V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR,
-    merger: (K, VR, VR) => VR): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
+                                        merger: (K, VR, VR) => VR): KTable[Windowed[K],
VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger)
 
   /**
    * Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`.
@@ -61,11 +60,12 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K,
V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
+  def aggregate[VR](initializer: => VR)(
     aggregator: (K, V, VR) => VR,
     merger: (K, VR, VR) => VR,
-    materialized: Materialized[K, VR, ByteArraySessionStore]): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, merger.asMerger,
materialized)
+    materialized: Materialized[K, VR, ByteArraySessionStore]
+  ): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger,
materialized)
 
   /**
    * Count the number of records in this stream by the grouped key into `SessionWindows`.
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index b00d025..a16c72b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -44,9 +44,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
-    aggregator: (K, V, VR) => VR): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[Windowed[K],
VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
@@ -58,10 +57,11 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V])
{
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: () => VR,
+  def aggregate[VR](initializer: => VR)(
     aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayWindowStore]): KTable[Windowed[K], VR] =
-    inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
+    materialized: Materialized[K, VR, ByteArrayWindowStore]
+  ): KTable[Windowed[K], VR] =
+    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
 
   /**
    * Count the number of records in this stream by the grouped key and the defined windows.
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 113458e..7aa0648 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -88,7 +88,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
       userClicksStream
 
         // Join the stream against the table.
-        .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null)
"UNKNOWN" else region, clicks))
+        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN"
else region, clicks))
 
         // Change the stream from <user> -> <region, clicks> to <region>
-> <clicks>
         .map((_, regionWithClicks) => regionWithClicks)
@@ -180,7 +180,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath())
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
 
     streamsConfiguration
   }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 9495ea7..e8b9f0f 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -142,7 +142,7 @@ class TopologyTest extends JUnitSuite {
   
       val clicksPerRegion: KTable[String, Long] =
         userClicksStream
-          .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region ==
null) "UNKNOWN" else region, clicks))
+          .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN"
else region, clicks))
           .map((_, regionWithClicks) => regionWithClicks)
           .groupByKey
           .reduce(_ + _)

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message