kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix Streams scala format violations (#5472)
Date Tue, 07 Aug 2018 19:50:47 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a9d7f8a  MINOR: Fix Streams scala format violations (#5472)
a9d7f8a is described below

commit a9d7f8a1fdfa129105c6b3a128a91524874071ef
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Wed Aug 8 01:20:42 2018 +0530

    MINOR: Fix Streams scala format violations (#5472)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/scala/kstream/KStream.scala      |  1 -
 .../apache/kafka/streams/scala/TopologyTest.scala  | 30 ++++++++++++----------
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index c02939a..a8766bd 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -299,7 +299,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
                         stateStoreNames: String*): KStream[K1, V1] =
     inner.transform(transformerSupplier.asTransformerSupplier, stateStoreNames: _*)
 
-
   /**
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to
each input
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 194abf5..8a0eabb 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -209,12 +209,15 @@ class TopologyTest extends JUnitSuite {
 
       val _: KTable[String, Long] =
         textLines
-          .transform(() => new Transformer[String, String, KeyValue[String, String]] {
-              override def init(context: ProcessorContext): Unit = Unit
-              override def transform(key: String, value: String): KeyValue[String, String]
=
-                new KeyValue(key, value.toLowerCase)
-              override def close(): Unit = Unit
-          })
+          .transform(
+            () =>
+              new Transformer[String, String, KeyValue[String, String]] {
+                override def init(context: ProcessorContext): Unit = Unit
+                override def transform(key: String, value: String): KeyValue[String, String]
=
+                  new KeyValue(key, value.toLowerCase)
+                override def close(): Unit = Unit
+            }
+          )
           .groupBy((k, v) => v)
           .count()
 
@@ -229,15 +232,16 @@ class TopologyTest extends JUnitSuite {
 
       val lowered: KStreamJ[String, String] = textLines
         .transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
-        override def get(): Transformer[String, String, KeyValue[String, String]] = new Transformer[String,
String, KeyValue[String, String]] {
-          override def init(context: ProcessorContext): Unit = Unit
+          override def get(): Transformer[String, String, KeyValue[String, String]] =
+            new Transformer[String, String, KeyValue[String, String]] {
+              override def init(context: ProcessorContext): Unit = Unit
 
-          override def transform(key: String, value: String): KeyValue[String, String] =
-            new KeyValue(key, value.toLowerCase)
+              override def transform(key: String, value: String): KeyValue[String, String]
=
+                new KeyValue(key, value.toLowerCase)
 
-          override def close(): Unit = Unit
-        }
-      })
+              override def close(): Unit = Unit
+            }
+        })
 
       val grouped: KGroupedStreamJ[String, String] = lowered.groupBy {
         new KeyValueMapper[String, String, String] {


Mime
View raw message