kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix streams Scala foreach recursive call (#5539)
Date Fri, 24 Aug 2018 00:07:45 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8559de  MINOR: Fix streams Scala foreach recursive call (#5539)
b8559de is described below

commit b8559de23d120ca07daa6f66de6bba253d16a74a
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Fri Aug 24 01:07:39 2018 +0100

    MINOR: Fix streams Scala foreach recursive call (#5539)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
---
 .../apache/kafka/streams/scala/FunctionConversions.scala |  6 ++++++
 .../org/apache/kafka/streams/scala/kstream/KStream.scala | 12 +++++-------
 .../org/apache/kafka/streams/scala/KStreamTest.scala     | 16 ++++++++++++++++
 3 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 65ea490..566ba22 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -34,6 +34,12 @@ import java.lang.{Iterable => JIterable}
  */
 object FunctionConversions {
 
+  implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit)
extends AnyVal {
+    def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] {
+      override def apply(key: K, value: V): Unit = p(key, value)
+    }
+  }
+
   implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal
{
     def asPredicate: Predicate[K, V] = new Predicate[K, V] {
       override def test(key: K, value: V): Boolean = p(key, value)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index adc1850..2bcbf04 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier,
TopicNameExtractor}
+import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -84,10 +84,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with new key and value (possibly both of
different type)
    * @see `org.apache.kafka.streams.kstream.KStream#map`
    */
-  def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] = {
-    val kvMapper = mapper.tupled andThen tuple2ToKeyValue
-    inner.map[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
-  }
+  def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] =
+    inner.map[KR, VR](mapper.asKeyValueMapper)
 
   /**
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
@@ -124,7 +122,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#flatMap`
    */
   def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR] = {
-    val kvMapper = mapper.tupled andThen (iter => iter.map(tuple2ToKeyValue).asJava)
+    val kvMapper = mapper.tupled.andThen(_.map(tuple2ToKeyValue).asJava)
     inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
   }
 
@@ -173,7 +171,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#foreach`
    */
   def foreach(action: (K, V) => Unit): Unit =
-    inner.foreach((k: K, v: V) => action(k, v))
+    inner.foreach(action.asForeachAction)
 
   /**
    * Creates an array of `KStream` from this stream by branching the records in the original
stream based on
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index 2e2132d..e70d900 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -75,6 +75,22 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     testDriver.close()
   }
 
+  "foreach a KStream" should "side effect records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+
+    var acc = ""
+    builder.stream[String, String](sourceTopic).foreach((_, value) => acc += value)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+    acc shouldBe "value1value2"
+
+    testDriver.close()
+  }
+
   "selectKey a KStream" should "select a new key" in {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"


Mime
View raw message