kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6936: Implicit materialized for aggregate, count and reduce (#5066)
Date Fri, 01 Jun 2018 00:20:00 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad56f04  KAFKA-6936: Implicit materialized for aggregate, count and reduce (#5066)
ad56f04 is described below

commit ad56f04af9d6173628f3ec41b0d9fd9f35855c14
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Fri Jun 1 01:19:37 2018 +0100

    KAFKA-6936: Implicit materialized for aggregate, count and reduce (#5066)
    
    In #4919 we propagate the SerDes for each of these aggregation operators.
    
    As @guozhangwang mentioned in that PR:
    
    ```
    reduce: inherit the key and value serdes from the parent XXImpl class.
    count: inherit the key serdes, enforce setting the Serdes.Long() for value serdes.
    aggregate: inherit the key serdes, do not set for value serdes internally.
    ```
    
    Although it's all good for reduce and count, it is quiet unsafe to have aggregate without
Materialized given. In fact I don't see why we would not give a Materialized for the aggregate
since the result type will always be different (otherwise use reduce) and also the value Serde
is simply not propagated.
    
    This has been discussed previously in a broader PR before but I believe for aggregate
we could pass implicitly a Materialized the same way we pass a Joined, just to avoid the stupid
case. Then if the user wants to specialize, he can give his own Materialized.
    
    Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
---
 .../kafka/streams/scala/ImplicitConversions.scala  | 17 +++++--
 .../streams/scala/kstream/KGroupedStream.scala     | 56 ++++-----------------
 .../streams/scala/kstream/KGroupedTable.scala      | 58 +++-------------------
 .../scala/kstream/SessionWindowedKStream.scala     | 53 +++-----------------
 .../scala/kstream/TimeWindowedKStream.scala        | 49 +++---------------
 .../apache/kafka/streams/scala/WordCountTest.scala |  4 +-
 6 files changed, 46 insertions(+), 191 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index c32563f..0c384a1 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -19,19 +19,22 @@
  */
 package org.apache.kafka.streams.scala
 
-import org.apache.kafka.streams.kstream.{KStream => KStreamJ,
-  KTable => KTableJ,
+import org.apache.kafka.streams.kstream.{
   KGroupedStream => KGroupedStreamJ,
+  KGroupedTable => KGroupedTableJ,
+  KStream => KStreamJ,
+  KTable => KTableJ,
   SessionWindowedKStream => SessionWindowedKStreamJ,
   TimeWindowedKStream => TimeWindowedKStreamJ,
-  KGroupedTable => KGroupedTableJ, _}
-
+  _
+}
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.common.serialization.Serde
-
 import scala.language.implicitConversions
 
+import org.apache.kafka.streams.processor.StateStore
+
 /**
  * Implicit conversions between the Scala wrapper objects and the underlying Java
  * objects.
@@ -70,6 +73,10 @@ object ImplicitConversions {
   implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]):
Produced[K, V] =
     Produced.`with`(keySerde, valueSerde)
 
+  implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],
+                                                            valueSerde: Serde[V]): Materialized[K,
V, S] =
+    Materialized.`with`[K, V, S](keySerde, valueSerde)
+
   implicit def joinedFromKeyValueOtherSerde[K, V, VO]
     (implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K,
V, VO] =
     Joined.`with`(keySerde, valueSerde, otherValueSerde)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 2e85bce..0e5abfd 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -38,18 +38,6 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
 
   /**
    * Count the number of records in this stream by the grouped key.
-   *
-   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long`
values that
-   * represent the latest (rolling) count (i.e., number of records) for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
-   */ 
-  def count(): KTable[K, Long] = {
-    val c: KTable[K, java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
-  /**
-   * Count the number of records in this stream by the grouped key.
    * The result is written into a local `KeyValueStore` (which is basically an ever-updating
materialized view)
    * provided by the given `materialized`.
    *
@@ -57,8 +45,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long`
values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
-   */ 
-  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long]
= {
+   */
+  def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K,
Long] = {
     val c: KTable[K, java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
     c.mapValues[Long](Long2long _)
@@ -68,53 +56,29 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * Combine the values of records in this stream by the grouped key.
    *
    * @param reducer   a function `(V, V) => V` that computes a new aggregate result. 
-   * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
-   */ 
-  def reduce(reducer: (V, V) => V): KTable[K, V] =
-    inner.reduce(reducer.asReducer)
-
-  /**
-   * Combine the values of records in this stream by the grouped key.
-   *
-   * @param reducer   a function `(V, V) => V` that computes a new aggregate result. 
    * @param materialized  an instance of `Materialized` used to materialize a state store.

    * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
-   */ 
-  def reduce(reducer: (V, V) => V, materialized: Materialized[K, V, ByteArrayKeyValueStore]):
KTable[K, V] = {
-
+   */
+  def reduce(reducer: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]):
KTable[K, V] =
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take
place
     // works perfectly with Scala 2.12 though
-    inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, materialized)
-  }
-
-  /**
-   * Aggregate the values of records in this stream by the grouped key.
-   *
-   * @param initializer   an `Initializer` that computes an initial intermediate aggregation
result
-   * @param aggregator    an `Aggregator` that computes a new aggregate result
-   * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
-   */ 
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[K, VR]
=
-    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
+    inner.reduce(reducer.asReducer, materialized)
 
   /**
    * Aggregate the values of records in this stream by the grouped key.
    *
    * @param initializer   an `Initializer` that computes an initial intermediate aggregation
result
    * @param aggregator    an `Aggregator` that computes a new aggregate result
-   * @param materialized  an instance of `Materialized` used to materialize a state store.

+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
-   */ 
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
-                                        materialized: Materialized[K, VR, ByteArrayKeyValueStore]):
KTable[K, VR] =
+   */
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 87a11c5..99bc83e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -39,25 +39,12 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
    * the same key into a new instance of [[KTable]].
    *
-   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long`
values that
-   * represent the latest (rolling) count (i.e., number of records) for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
-   */
-  def count(): KTable[K, Long] = {
-    val c: KTable[K, java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
-  /**
-   * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
-   * the same key into a new instance of [[KTable]].
-   *
    * @param materialized  an instance of `Materialized` used to materialize a state store.

    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long`
values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
    */
-  def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long]
= {
+  def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K,
Long] = {
     val c: KTable[K, java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
     c.mapValues[Long](Long2long _)
@@ -69,30 +56,13 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    *
    * @param adder      a function that adds a new value to the aggregate result
    * @param subtractor a function that removed an old value from the aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
    */
   def reduce(adder: (V, V) => V,
-             subtractor: (V, V) => V): KTable[K, V] =
-    // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take
place
-    // works perfectly with Scala 2.12 though
-    inner.reduce(adder.asReducer, subtractor.asReducer)
-
-  /**
-   * Combine the value of records of the original [[KTable]] that got [[KTable#groupBy]]
-   * to the same key into a new instance of [[KTable]].
-   *
-   * @param adder      a function that adds a new value to the aggregate result
-   * @param subtractor a function that removed an old value from the aggregate result
-   * @param materialized  an instance of `Materialized` used to materialize a state store.

-   * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
-   */
-  def reduce(adder: (V, V) => V,
-             subtractor: (V, V) => V,
-             materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+             subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]):
KTable[K, V] =
     // need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take
place
     // works perfectly with Scala 2.12 though
     inner.reduce(adder.asReducer, subtractor.asReducer, materialized)
@@ -104,27 +74,13 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * @param initializer a function that provides an initial aggregate result value
    * @param adder       a function that adds a new record to the aggregate result
    * @param subtractor  an aggregator function that removed an old record from the aggregate
result
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V,
VR) => VR): KTable[K, VR] =
-    inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator)
-
-  /**
-   * Aggregate the value of records of the original [[KTable]] that got [[KTable#groupBy]]
-   * to the same key into a new instance of [[KTable]] using default serializers and deserializers.
-   *
-   * @param initializer a function that provides an initial aggregate result value
-   * @param adder       a function that adds a new record to the aggregate result
-   * @param subtractor  an aggregator function that removed an old record from the aggregate
result
-   * @param materialized  an instance of `Materialized` used to materialize a state store.

-   * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
-   */
-  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR,
-                                        subtractor: (K, V, VR) => VR,
-                                        materialized: Materialized[K, VR, ByteArrayKeyValueStore]):
KTable[K, VR] =
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V,
VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator,
materialized)
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index fd2a565..ed41973 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -40,54 +40,27 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K,
V]) {
    *
    * @param initializer    the initializer function
    * @param aggregator     the aggregator function
-   * @param sessionMerger  the merger function
+   * @param merger         the merger function
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and
values that represent
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
   def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
-                                        merger: (K, VR, VR) => VR): KTable[Windowed[K],
VR] =
-    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger)
-
-  /**
-   * Aggregate the values of records in this stream by the grouped key and defined `SessionWindows`.
-   *
-   * @param initializer    the initializer function
-   * @param aggregator     the aggregator function
-   * @param sessionMerger  the merger function
-   * @param materialized  an instance of `Materialized` used to materialize a state store.

-   * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and
values that represent
-   * the latest (rolling) aggregate for each key within a window
-   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
-   */
-  def aggregate[VR](initializer: => VR)(
-    aggregator: (K, V, VR) => VR,
-    merger: (K, VR, VR) => VR,
-    materialized: Materialized[K, VR, ByteArraySessionStore]
+                                        merger: (K, VR, VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArraySessionStore]
   ): KTable[Windowed[K], VR] =
     inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger,
materialized)
 
   /**
    * Count the number of records in this stream by the grouped key into `SessionWindows`.
    *
-   * @return a windowed [[KTable]] that contains "update" records with unmodified keys and
`Long` values
-   * that represent the latest (rolling) count (i.e., number of records) for each key within
a window
-   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
-   */
-  def count(): KTable[Windowed[K], Long] = {
-    val c: KTable[Windowed[K], java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
-  /**
-   * Count the number of records in this stream by the grouped key into `SessionWindows`.
-   *
    * @param materialized  an instance of `Materialized` used to materialize a state store.

    * @return a windowed [[KTable]] that contains "update" records with unmodified keys and
`Long` values
    * that represent the latest (rolling) count (i.e., number of records) for each key within
a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
    */
-  def count(materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K],
Long] = {
+  def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K],
Long] = {
     val c: KTable[Windowed[K], java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
     c.mapValues[Long](Long2long _)
@@ -97,23 +70,13 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K,
V]) {
    * Combine values of this stream by the grouped key into {@link SessionWindows}.
    *
    * @param reducer           a reducer function that computes a new aggregate result. 
-   * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and
values that represent
-   * the latest (rolling) aggregate for each key within a window
-   * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
-   */
-  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
-    inner.reduce((v1, v2) => reducer(v1, v2))
-
-  /**
-   * Combine values of this stream by the grouped key into {@link SessionWindows}.
-   *
-   * @param reducer           a reducer function that computes a new aggregate result. 
    * @param materialized  an instance of `Materialized` used to materialize a state store.

    * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and
values that represent
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, ByteArraySessionStore]): KTable[Windowed[K], V] =
+  def reduce(reducer: (V, V) => V)(
+    implicit materialized: Materialized[K, V, ByteArraySessionStore]
+  ): KTable[Windowed[K], V] =
     inner.reduce(reducer.asReducer, materialized)
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index a16c72b..9e31ab9 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -40,50 +40,25 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V])
{
    *
    * @param initializer   an initializer function that computes an initial intermediate aggregation
result
    * @param aggregator    an aggregator function that computes a new aggregate result
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR): KTable[Windowed[K],
VR] =
-    inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator)
-
-  /**
-   * Aggregate the values of records in this stream by the grouped key.
-   *
-   * @param initializer   an initializer function that computes an initial intermediate aggregation
result
-   * @param aggregator    an aggregator function that computes a new aggregate result
-   * @param materialized  an instance of `Materialized` used to materialize a state store.

-   * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
-   */
-  def aggregate[VR](initializer: => VR)(
-    aggregator: (K, V, VR) => VR,
-    materialized: Materialized[K, VR, ByteArrayWindowStore]
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
+    implicit materialized: Materialized[K, VR, ByteArrayWindowStore]
   ): KTable[Windowed[K], VR] =
     inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)
 
   /**
    * Count the number of records in this stream by the grouped key and the defined windows.
    *
-   * @return a [[KTable]] that contains "update" records with unmodified keys and `Long`
values that
-   * represent the latest (rolling) count (i.e., number of records) for each key
-   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
-   */ 
-  def count(): KTable[Windowed[K], Long] = {
-    val c: KTable[Windowed[K], java.lang.Long] = inner.count()
-    c.mapValues[Long](Long2long _)
-  }
-
-  /**
-   * Count the number of records in this stream by the grouped key and the defined windows.
-   *
    * @param materialized  an instance of `Materialized` used to materialize a state store.

    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long`
values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
    */ 
-  def count(materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K],
Long] = {
+  def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K],
Long] = {
     val c: KTable[Windowed[K], java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
     c.mapValues[Long](Long2long _)
@@ -93,23 +68,13 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V])
{
    * Combine the values of records in this stream by the grouped key.
    *
    * @param reducer   a function that computes a new aggregate result
-   * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
-   * latest (rolling) aggregate for each key
-   * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
-   */
-  def reduce(reducer: (V, V) => V): KTable[Windowed[K], V] =
-    inner.reduce(reducer.asReducer)
-
-  /**
-   * Combine the values of records in this stream by the grouped key.
-   *
-   * @param reducer   a function that computes a new aggregate result
    * @param materialized  an instance of `Materialized` used to materialize a state store.

    * @return a [[KTable]] that contains "update" records with unmodified keys, and values
that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V,
-    materialized: Materialized[K, V, ByteArrayWindowStore]): KTable[Windowed[K], V] =
+  def reduce(reducer: (V, V) => V)(
+    implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+  ): KTable[Windowed[K], V] =
     inner.reduce(reducer.asReducer, materialized)
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 12b8c8c..5abc1bc 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -87,7 +87,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
     // generate word counts
     val wordCounts: KTable[String, Long] =
       textLines.flatMapValues(v => pattern.split(v.toLowerCase))
-        .groupBy((k, v) => v)
+        .groupBy((_, v) => v)
         .count()
 
     // write to output topic
@@ -119,7 +119,7 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
     val wordCounts: KTable[String, Long] =
       textLines.flatMapValues(v => pattern.split(v.toLowerCase))
         .groupBy((k, v) => v)
-        .count(Materialized.as("word-count"))
+        .count()(Materialized.as("word-count"))
 
     // write to output topic
     wordCounts.toStream.to(outputTopic)

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message