kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7316: Fix Streams Scala filter recursive call #5538
Date Thu, 23 Aug 2018 16:15:33 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d8f9f27  KAFKA-7316: Fix Streams Scala filter recursive call #5538
d8f9f27 is described below

commit d8f9f278a2326f3d1ef872a21ef1b43593df571f
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Thu Aug 23 17:15:27 2018 +0100

    KAFKA-7316: Fix Streams Scala filter recursive call #5538
    
    Due to lack of conversion to kstream Predicate, existing filter method in KTable.scala
would result in StackOverflowError.
    
    This PR fixes the bug and adds testing for it.
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
---
 .../kafka/streams/scala/kstream/KTable.scala       |  4 +-
 .../apache/kafka/streams/scala/KStreamTest.scala   | 48 ++++++++++++++++
 .../apache/kafka/streams/scala/KTableTest.scala    | 66 ++++++++++++++++++++++
 3 files changed, 116 insertions(+), 2 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index a78d321..d41496f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -47,7 +47,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filter`
    */
   def filter(predicate: (K, V) => Boolean): KTable[K, V] =
-    inner.filter(predicate(_, _))
+    inner.filter(predicate.asPredicate)
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies
the given
@@ -71,7 +71,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
    */
   def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
-    inner.filterNot(predicate(_, _))
+    inner.filterNot(predicate.asPredicate)
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em>
satisfy the given
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index 6a302b2..2e2132d 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -29,6 +29,52 @@ import org.scalatest.{FlatSpec, Matchers}
 @RunWith(classOf[JUnitRunner])
 class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
+  "filter a KStream" should "filter records satisfying the predicate" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    builder.stream[String, String](sourceTopic).filter((_, value) => value != "value2").to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "filterNot a KStream" should "filter records not satisfying the predicate" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    builder.stream[String, String](sourceTopic).filterNot((_, value) => value == "value2").to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
   "selectKey a KStream" should "select a new key" in {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"
@@ -44,6 +90,8 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     testDriver.pipeRecord(sourceTopic, ("1", "value2"))
     testDriver.readRecord[String, String](sinkTopic).key shouldBe "value2"
 
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
     testDriver.close()
   }
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
index 8c88ff5..2e9c821 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
@@ -29,6 +29,72 @@ import org.scalatest.{FlatSpec, Matchers}
 @RunWith(classOf[JUnitRunner])
 class KTableTest extends FlatSpec with Matchers with TestDriver {
 
+  "filter a KTable" should "filter records satisfying the predicate" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
+    table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe (null: java.lang.Long)
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe 2
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("2", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "2"
+      record.value shouldBe (null: java.lang.Long)
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "filterNot a KTable" should "filter records not satisfying the predicate" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
+    table.filterNot((_, value) => value > 1).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe 1
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe (null: java.lang.Long)
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("2", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "2"
+      record.value shouldBe 1
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
   "join 2 KTables" should "join correctly records" in {
     val builder = new StreamsBuilder()
     val sourceTopic1 = "source1"


Mime
View raw message