kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7250: fix transform function in scala DSL to accept TranformerSupplier (#5468)
Date Tue, 07 Aug 2018 15:02:46 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new adf9d0e  KAFKA-7250: fix transform function in scala DSL to accept TranformerSupplier
(#5468)
adf9d0e is described below

commit adf9d0e193e2fd01b9367524849dc23b65a68e2d
Author: Michal Dziemianko <michal.dziemianko@gmail.com>
AuthorDate: Tue Aug 7 16:00:22 2018 +0100

    KAFKA-7250: fix transform function in scala DSL to accept TranformerSupplier (#5468)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/scala/FunctionConversions.scala  |  6 +++
 .../kafka/streams/scala/kstream/KStream.scala      | 35 ++++---------
 .../apache/kafka/streams/scala/TopologyTest.scala  | 59 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 25 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 9ce9838..abf1659 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -105,4 +105,10 @@ object FunctionConversions {
       override def apply(): VA = f()
     }
   }
+
+  implicit class TransformerSupplierFromFunction[K, V, VO](val f: () => Transformer[K,
V, VO]) extends AnyVal {
+    def asTransformerSupplier: TransformerSupplier[K, V, VO] = new TransformerSupplier[K,
V, VO] {
+      override def get(): Transformer[K, V, VO] = f()
+    }
+  }
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 0f1fc82..8806f5c 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier,
TopicNameExtractor}
+import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -284,35 +284,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   /**
    * Transform each record of the input stream into zero or more records in the output stream
(both key and value type
    * can be altered arbitrarily).
-   * A `Transformer` is applied to each input record and computes zero or more output records.
In order to assign a
-   * state, the state must be created and registered beforehand via stores added via `addStateStore`
or `addGlobalStore`
+   * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input
record
+   * and computes zero or more output records.
+   * In order to assign a state, the state must be created and registered
+   * beforehand via stores added via `addStateStore` or `addGlobalStore`
    * before they can be connected to the `Transformer`
    *
-   * @param transformer the `Transformer` instance
+   * @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
    * @param stateStoreNames     the names of the state stores used by the processor
    * @return a [[KStream]] that contains more or less records with new key and value (possibly
of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
-   */ 
-  def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)],
-    stateStoreNames: String*): KStream[K1, V1] = {
-    val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K,
V, KeyValue[K1, V1]] {
-      override def get(): Transformer[K, V, KeyValue[K1, V1]] = {
-        new Transformer[K, V, KeyValue[K1, V1]] {
-          override def transform(key: K, value: V): KeyValue[K1, V1] = {
-            transformer.transform(key, value) match {
-              case (k1, v1) => KeyValue.pair(k1, v1)
-              case _ => null
-            }
-          }
-
-          override def init(context: ProcessorContext): Unit = transformer.init(context)
-
-          override def close(): Unit = transformer.close()
-        }
-      }
-    }
-    inner.transform(transformerSupplierJ, stateStoreNames: _*)
-  }
+   */
+  def transform[K1, V1](transformerSupplier: () => Transformer[K, V, KeyValue[K1, V1]],
+                        stateStoreNames: String*): KStream[K1, V1] =
+    inner.transform(transformerSupplier.asTransformerSupplier, stateStoreNames: _*)
 
   /**
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index ffae666..15b3c18 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -31,6 +31,8 @@ import ImplicitConversions._
 
 import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream
=> KGroupedStreamJ, _}
+import org.apache.kafka.streams.processor.ProcessorContext
+
 import collection.JavaConverters._
 
 /**
@@ -190,4 +192,61 @@ class TopologyTest extends JUnitSuite {
     // should match
     assertEquals(getTopologyScala(), getTopologyJava())
   }
+
+  @Test def shouldBuildIdenticalTopologyInJavaNScalaTransform() = {
+
+    // build the Scala topology
+    def getTopologyScala(): TopologyDescription = {
+
+      import Serdes._
+
+      val streamBuilder = new StreamsBuilder
+      val textLines = streamBuilder.stream[String, String](inputTopic)
+
+      val _: KTable[String, Long] =
+        textLines
+          .transform(() => new Transformer[String, String, KeyValue[String, String]] {
+              override def init(context: ProcessorContext): Unit = Unit
+              override def transform(key: String, value: String): KeyValue[String, String]
=
+                new KeyValue(key, value.toLowerCase)
+              override def close(): Unit = Unit
+          })
+          .groupBy((k, v) => v)
+          .count()
+
+      streamBuilder.build().describe()
+    }
+
+    // build the Java topology
+    def getTopologyJava(): TopologyDescription = {
+
+      val streamBuilder = new StreamsBuilderJ
+      val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
+
+      val lowered: KStreamJ[String, String] = textLines
+        .transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
+        override def get(): Transformer[String, String, KeyValue[String, String]] = new Transformer[String,
String, KeyValue[String, String]] {
+          override def init(context: ProcessorContext): Unit = Unit
+
+          override def transform(key: String, value: String): KeyValue[String, String] =
+            new KeyValue(key, value.toLowerCase)
+
+          override def close(): Unit = Unit
+        }
+      })
+
+      val grouped: KGroupedStreamJ[String, String] = lowered.groupBy {
+        new KeyValueMapper[String, String, String] {
+          def apply(k: String, v: String): String = v
+        }
+      }
+
+      val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
+
+      streamBuilder.build().describe()
+    }
+
+    // should match
+    assertEquals(getTopologyScala(), getTopologyJava())
+  }
 }


Mime
View raw message