kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix streams Scala peek recursive call (#5566)
Date Wed, 29 Aug 2018 16:34:10 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34d9ae6  MINOR: Fix streams Scala peek recursive call (#5566)
34d9ae6 is described below

commit 34d9ae66281602dad3a70ecd07ca0f2b6237ae8c
Author: tedyu <yuzhihong@gmail.com>
AuthorDate: Wed Aug 29 09:34:01 2018 -0700

    MINOR: Fix streams Scala peek recursive call (#5566)
    
    This PR fixes the previously recursive call of Streams Scala peek
    
    Reviewers: Joan Goyeau <joan@goyeau.com>, Guozhang Wang <guozhang@confluent.io>,
John Roesler <john@confluent.io>
---
 .../kafka/streams/scala/kstream/KStream.scala      |  2 +-
 .../apache/kafka/streams/scala/KStreamTest.scala   | 25 +++++++++++++++++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 2bcbf04..8e4c9aa 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -573,5 +573,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#peek`
    */
   def peek(action: (K, V) => Unit): KStream[K, V] =
-    inner.peek((k: K, v: V) => action(k, v))
+    inner.peek(action.asForeachAction)
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index e70d900..3fdfee6 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -75,7 +75,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     testDriver.close()
   }
 
-  "foreach a KStream" should "side effect records" in {
+  "foreach a KStream" should "run foreach actions on records" in {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"
 
@@ -85,8 +85,31 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     val testDriver = createTestDriver(builder)
 
     testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    acc shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+    acc shouldBe "value1value2"
+
+    testDriver.close()
+  }
+
+  "peek a KStream" should "run peek actions on records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    var acc = ""
+    builder.stream[String, String](sourceTopic).peek((k, v) => acc += v).to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    acc shouldBe "value1"
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
     testDriver.pipeRecord(sourceTopic, ("2", "value2"))
     acc shouldBe "value1value2"
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value2"
 
     testDriver.close()
   }


Mime
View raw message