kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7301: Fix streams Scala join ambiguous overload (#5502)
Date Tue, 21 Aug 2018 22:41:46 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dae9c41  KAFKA-7301: Fix streams Scala join ambiguous overload (#5502)
dae9c41 is described below

commit dae9c41838e745d51dba4e6be18359d42d5ff8a3
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Tue Aug 21 23:41:36 2018 +0100

    KAFKA-7301: Fix streams Scala join ambiguous overload (#5502)
    
    Join in the Scala streams API is currently unusable in 2.0.0 as reported by @mowczare:
    #5019 (comment)
    
    This due to an overload of it with the same signature in the first curried parameter.
    See compiler issue that didn't catch it: https://issues.scala-lang.org/browse/SI-2628
    
    Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>,
John Roesler <john@confluent.io>
---
 build.gradle                                       |  1 +
 .../kafka/streams/scala/kstream/KTable.scala       | 16 ++---
 .../apache/kafka/streams/scala/KStreamTest.scala   | 70 +++++++++++++++++++
 .../apache/kafka/streams/scala/KTableTest.scala    | 79 ++++++++++++++++++++++
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala | 16 +----
 ...StreamToTableJoinScalaIntegrationTestBase.scala |  2 +-
 .../{ => utils}/StreamToTableJoinTestData.scala    |  2 +-
 .../kafka/streams/scala/utils/TestDriver.scala     | 52 ++++++++++++++
 8 files changed, 213 insertions(+), 25 deletions(-)

diff --git a/build.gradle b/build.gradle
index c1387d4..c963253 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1021,6 +1021,7 @@ project(':streams:streams-scala') {
     testCompile project(':core').sourceSets.test.output
     testCompile project(':streams').sourceSets.test.output
     testCompile project(':clients').sourceSets.test.output
+    testCompile project(':streams:test-utils')
 
     testCompile libs.junit
     testCompile libs.scalatest
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index b669771..a78d321 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -20,6 +20,7 @@
 package org.apache.kafka.streams.scala
 package kstream
 
+import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.common.utils.Bytes
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
@@ -245,9 +246,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
    */
-  def join[VO, VR](other: KTable[K, VO])(
-    joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def join[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
+    joiner: (V, VO) => VR
   ): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
@@ -274,9 +274,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */
-  def leftJoin[VO, VR](other: KTable[K, VO])(
-    joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def leftJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
+    joiner: (V, VO) => VR
   ): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
@@ -303,9 +302,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */
-  def outerJoin[VO, VR](other: KTable[K, VO])(
-    joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def outerJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
+    joiner: (V, VO) => VR
   ): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
new file mode 100644
index 0000000..6a302b2
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.kstream.JoinWindows
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.utils.TestDriver
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class KStreamTest extends FlatSpec with Matchers with TestDriver {
+
+  "selectKey a KStream" should "select a new key" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    builder.stream[String, String](sourceTopic).selectKey((_, value) => value).to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    testDriver.readRecord[String, String](sinkTopic).key shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+    testDriver.readRecord[String, String](sinkTopic).key shouldBe "value2"
+
+    testDriver.close()
+  }
+
+  "join 2 KStreams" should "join correctly records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+
+    val stream1 = builder.stream[String, String](sourceTopic1)
+    val stream2 = builder.stream[String, String](sourceTopic2)
+    stream1.join(stream2)((a, b) => s"$a-$b", JoinWindows.of(1000)).to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
+    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "topic1value1-topic2value1"
+
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
new file mode 100644
index 0000000..8c88ff5
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.utils.TestDriver
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class KTableTest extends FlatSpec with Matchers with TestDriver {
+
+  "join 2 KTables" should "join correctly records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+
+    val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count()
+    val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count()
+    table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
+    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
+    testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
+
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "join 2 KTables with a Materialized" should "join correctly records and state store" in
{
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+    val stateStore = "store"
+    val materialized = Materialized
+      .as[String, Long, ByteArrayKeyValueStore](stateStore)
+      .withKeySerde(Serdes.String)
+      .withValueSerde(Serdes.Long)
+
+    val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count()
+    val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count()
+    table1.join(table2, materialized)((a, b) => a + b).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
+    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
+    testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
+    testDriver.getKeyValueStore[String, Long](stateStore).get("1") shouldBe 2
+
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 3d1bab5..fd5f361 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -18,20 +18,11 @@ package org.apache.kafka.streams.scala
 
 import java.util.Properties
 
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization._
-import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.streams._
-import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
-import org.apache.kafka.streams.processor.internals.StreamThread
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.test.TestUtils
-import org.junit.Assert._
+import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase
 import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.scalatest.junit.JUnitSuite
 
 /**
  * Test suite that does an example to demonstrate stream-table joins in Kafka Streams
@@ -141,10 +132,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
     val streams: KafkaStreamsJ = new KafkaStreamsJ(builder.build(), streamsConfiguration)
 
     streams.start()
-
-    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
-      produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
-
+    produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
     streams.close()
   }
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
similarity index 99%
rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
index cf87eb5..9a3ee7f 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.scala
+package org.apache.kafka.streams.scala.utils
 
 import java.util.Properties
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala
similarity index 97%
rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala
index e9040ee..890d8c2 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.scala
+package org.apache.kafka.streams.scala.utils
 
 import org.apache.kafka.streams.KeyValue
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
new file mode 100644
index 0000000..1497dd7
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.utils
+
+import java.util.{Properties, UUID}
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.scala.StreamsBuilder
+import org.apache.kafka.streams.test.ConsumerRecordFactory
+import org.apache.kafka.streams.{StreamsConfig, TopologyTestDriver}
+import org.scalatest.Suite
+
+trait TestDriver { this: Suite =>
+
+  def createTestDriver(builder: StreamsBuilder, initialWallClockTimeMs: Long = System.currentTimeMillis())
= {
+    val config = new Properties()
+    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
+    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
+    config.put(StreamsConfig.STATE_DIR_CONFIG, s"out/state-store-${UUID.randomUUID()}")
+    new TopologyTestDriver(builder.build(), config, initialWallClockTimeMs)
+  }
+
+  implicit class TopologyTestDriverOps(inner: TopologyTestDriver) {
+    def pipeRecord[K, V](topic: String, record: (K, V), timestampMs: Long = System.currentTimeMillis())(
+      implicit serdeKey: Serde[K],
+      serdeValue: Serde[V]
+    ): Unit = {
+      val recordFactory = new ConsumerRecordFactory[K, V](serdeKey.serializer, serdeValue.serializer)
+      inner.pipeInput(recordFactory.create(topic, record._1, record._2, timestampMs))
+    }
+
+    def readRecord[K, V](topic: String)(implicit serdeKey: Serde[K], serdeValue: Serde[V]):
ProducerRecord[K, V] =
+      inner.readOutput(topic, serdeKey.deserializer, serdeValue.deserializer)
+  }
+}


Mime
View raw message