kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7301: Fix streams Scala join ambiguous overload (#5502)
Date Tue, 21 Aug 2018 23:38:27 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 2971cfc  KAFKA-7301: Fix streams Scala join ambiguous overload (#5502)
2971cfc is described below

commit 2971cfccbb0bb951e8e93f3fb6622c990b27d63e
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Tue Aug 21 23:41:36 2018 +0100

    KAFKA-7301: Fix streams Scala join ambiguous overload (#5502)
    
    Join in the Scala streams API is currently unusable in 2.0.0 as reported by @mowczare:
    
    This due to an overload of it with the same signature in the first curried parameter.
    See compiler issue that didn't catch it: https://issues.scala-lang.org/browse/SI-2628
    
    Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>,
John Roesler <john@confluent.io>
    
    minor
---
 build.gradle                                       |   1 +
 .../integration/utils/IntegrationTestUtils.java    |  39 ++++++
 .../kafka/streams/scala/kstream/KTable.scala       |  16 ++-
 .../apache/kafka/streams/scala/KStreamTest.scala   |  70 +++++++++++
 .../apache/kafka/streams/scala/KTableTest.scala    |  79 ++++++++++++
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  15 +--
 ...StreamToTableJoinScalaIntegrationTestBase.scala | 135 +++++++++++++++++++++
 .../{ => utils}/StreamToTableJoinTestData.scala    |   2 +-
 .../kafka/streams/scala/utils/TestDriver.scala     |  52 ++++++++
 9 files changed, 388 insertions(+), 21 deletions(-)

diff --git a/build.gradle b/build.gradle
index ea2e9de..3579ab8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1008,6 +1008,7 @@ project(':streams:streams-scala') {
     testCompile project(':core').sourceSets.test.output
     testCompile project(':streams').sourceSets.test.output
     testCompile project(':clients').sourceSets.test.output
+    testCompile project(':streams:test-utils')
 
     testCompile libs.junit
     testCompile libs.scalatest
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 35521c6..9146d6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 /**
  * Utility functions to make integration testing more convenient.
@@ -322,6 +323,44 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final
Properties consumerConfig,
+                                                                                    final
String topic,
+                                                                                    final
List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
+        return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords,
DEFAULT_TIMEOUT);
+    }
+
+    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final
Properties consumerConfig,
+                                                                                    final
String topic,
+                                                                                    final
List<KeyValue<K, V>> expectedRecords,
+                                                                                    final
long waitTime) throws InterruptedException {
+        final List<KeyValue<K, V>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = () -> {
+                final List<KeyValue<K, V>> readData =
+                    readKeyValues(topic, consumer, waitTime, expectedRecords.size());
+                accumData.addAll(readData);
+
+                final int accumLastIndex = accumData.size() - 1;
+                final int expectedLastIndex = expectedRecords.size() - 1;
+
+                // filter out all intermediate records we don't want
+                final List<KeyValue<K, V>> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
+
+                // need this check as filtering above could have removed the last record
from accumData, but it did not
+                // equal the last expected record
+                final boolean lastRecordsMatch = accumData.get(accumLastIndex).equals(expectedRecords.get(expectedLastIndex));
+
+                // returns true only if the remaining records in both lists are the same
and in the same order
+                // and the last record received matches the last expected record
+                return accumulatedActual.equals(expectedRecords) && lastRecordsMatch;
+
+            };
+            final String conditionDetails = "Did not receive all " + expectedRecords + "
records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final
Properties consumerConfig,
                                                                                 final String
topic,
                                                                                 final int
expectedNumRecords,
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index b669771..a78d321 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -20,6 +20,7 @@
 package org.apache.kafka.streams.scala
 package kstream
 
+import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.common.utils.Bytes
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
@@ -245,9 +246,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
    */
-  def join[VO, VR](other: KTable[K, VO])(
-    joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def join[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
+    joiner: (V, VO) => VR
   ): KTable[K, VR] =
     inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
@@ -274,9 +274,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */
-  def leftJoin[VO, VR](other: KTable[K, VO])(
-    joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def leftJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
+    joiner: (V, VO) => VR
   ): KTable[K, VR] =
     inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
@@ -303,9 +302,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
    */
-  def outerJoin[VO, VR](other: KTable[K, VO])(
-    joiner: (V, VO) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def outerJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
+    joiner: (V, VO) => VR
   ): KTable[K, VR] =
     inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
new file mode 100644
index 0000000..6a302b2
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.kstream.JoinWindows
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.utils.TestDriver
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class KStreamTest extends FlatSpec with Matchers with TestDriver {
+
+  "selectKey a KStream" should "select a new key" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    builder.stream[String, String](sourceTopic).selectKey((_, value) => value).to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    testDriver.readRecord[String, String](sinkTopic).key shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+    testDriver.readRecord[String, String](sinkTopic).key shouldBe "value2"
+
+    testDriver.close()
+  }
+
+  "join 2 KStreams" should "join correctly records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+
+    val stream1 = builder.stream[String, String](sourceTopic1)
+    val stream2 = builder.stream[String, String](sourceTopic2)
+    stream1.join(stream2)((a, b) => s"$a-$b", JoinWindows.of(1000)).to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
+    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "topic1value1-topic2value1"
+
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
new file mode 100644
index 0000000..8c88ff5
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.kstream.Materialized
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.utils.TestDriver
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class KTableTest extends FlatSpec with Matchers with TestDriver {
+
+  "join 2 KTables" should "join correctly records" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+
+    val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count()
+    val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count()
+    table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
+    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
+    testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
+
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "join 2 KTables with a Materialized" should "join correctly records and state store" in
{
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+    val stateStore = "store"
+    val materialized = Materialized
+      .as[String, Long, ByteArrayKeyValueStore](stateStore)
+      .withKeySerde(Serdes.String)
+      .withValueSerde(Serdes.Long)
+
+    val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count()
+    val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count()
+    table1.join(table2, materialized)((a, b) => a + b).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
+    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
+    testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
+    testDriver.getKeyValueStore[String, Long](stateStore).get("1") shouldBe 2
+
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 7891131..91634be 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -18,16 +18,16 @@ package org.apache.kafka.streams.scala
 
 import java.util.Properties
 
+import kafka.utils.MockTime
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization._
-import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.serialization.{LongDeserializer, LongSerializer, StringDeserializer,
StringSerializer}
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.streams.scala.utils.StreamToTableJoinTestData
 import org.apache.kafka.test.TestUtils
-import org.junit.Assert._
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.scalatest.junit.JUnitSuite
@@ -103,9 +103,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
wit
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
 
     streams.close()
-
-    import collection.JavaConverters._
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key))
   }
 
   @Test def testShouldCountClicksPerRegionJava(): Unit = {
@@ -165,12 +162,8 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
wit
     val streams: KafkaStreamsJ = new KafkaStreamsJ(builder.build(), streamsConfiguration)
 
     streams.start()
-
-    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
-      produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
-
+    produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
     streams.close()
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key))
   }
 
   private def getStreamsConfiguration(): Properties = {
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
new file mode 100644
index 0000000..9a3ee7f
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -0,0 +1,135 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.utils
+
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.streams._
+import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
+import org.apache.kafka.test.TestUtils
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Test suite base that prepares Kafka cluster for stream-table joins in Kafka Streams
+ * <p>
+ */
+class StreamToTableJoinScalaIntegrationTestBase extends JUnitSuite with StreamToTableJoinTestData
{
+
+  private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
+
+  @Rule def cluster: EmbeddedKafkaCluster = privateCluster
+
+  final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
+  val mockTime: MockTime = cluster.time
+  mockTime.setCurrentTimeMs(alignedTime)
+
+  val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
+  @Rule def testFolder: TemporaryFolder = tFolder
+
+  @Before
+  def startKafkaCluster(): Unit = {
+    cluster.createTopic(userClicksTopic)
+    cluster.createTopic(userRegionsTopic)
+    cluster.createTopic(outputTopic)
+    cluster.createTopic(userClicksTopicJ)
+    cluster.createTopic(userRegionsTopicJ)
+    cluster.createTopic(outputTopicJ)
+  }
+
+  def getStreamsConfiguration(): Properties = {
+    val streamsConfiguration: Properties = new Properties()
+
+    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-scala-integration-test")
+    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
+    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
+
+    streamsConfiguration
+  }
+
+  private def getUserRegionsProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p
+  }
+
+  private def getUserClicksProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer])
+    p
+  }
+
+  private def getConsumerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer")
+    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
+    p
+  }
+
+  def produceNConsume(userClicksTopic: String,
+                      userRegionsTopic: String,
+                      outputTopic: String,
+                      waitTillRecordsReceived: Boolean = true): java.util.List[KeyValue[String,
Long]] = {
+
+    import collection.JavaConverters._
+
+    // Publish user-region information.
+    val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
+                                                       userRegions.asJava,
+                                                       userRegionsProducerConfig,
+                                                       mockTime,
+                                                       false)
+
+    // Publish user-click information.
+    val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
+                                                       userClicks.asJava,
+                                                       userClicksProducerConfig,
+                                                       mockTime,
+                                                       false)
+
+    if (waitTillRecordsReceived) {
+      // consume and verify result
+      val consumerConfig = getConsumerConfig()
+
+      IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig,
+                                                                 outputTopic,
+                                                                 expectedClicksPerRegion.asJava)
+    } else {
+      java.util.Collections.emptyList()
+    }
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala
similarity index 97%
rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala
index e9040ee..890d8c2 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.scala
+package org.apache.kafka.streams.scala.utils
 
 import org.apache.kafka.streams.KeyValue
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
new file mode 100644
index 0000000..1497dd7
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.utils
+
+import java.util.{Properties, UUID}
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.scala.StreamsBuilder
+import org.apache.kafka.streams.test.ConsumerRecordFactory
+import org.apache.kafka.streams.{StreamsConfig, TopologyTestDriver}
+import org.scalatest.Suite
+
+trait TestDriver { this: Suite =>
+
+  def createTestDriver(builder: StreamsBuilder, initialWallClockTimeMs: Long = System.currentTimeMillis())
= {
+    val config = new Properties()
+    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
+    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
+    config.put(StreamsConfig.STATE_DIR_CONFIG, s"out/state-store-${UUID.randomUUID()}")
+    new TopologyTestDriver(builder.build(), config, initialWallClockTimeMs)
+  }
+
+  implicit class TopologyTestDriverOps(inner: TopologyTestDriver) {
+    def pipeRecord[K, V](topic: String, record: (K, V), timestampMs: Long = System.currentTimeMillis())(
+      implicit serdeKey: Serde[K],
+      serdeValue: Serde[V]
+    ): Unit = {
+      val recordFactory = new ConsumerRecordFactory[K, V](serdeKey.serializer, serdeValue.serializer)
+      inner.pipeInput(recordFactory.create(topic, record._1, record._2, timestampMs))
+    }
+
+    def readRecord[K, V](topic: String)(implicit serdeKey: Serde[K], serdeValue: Serde[V]):
ProducerRecord[K, V] =
+      inner.readOutput(topic, serdeKey.deserializer, serdeValue.deserializer)
+  }
+}


Mime
View raw message