kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Add Scalafmt to Streams Scala API (#4965)
Date Mon, 09 Jul 2018 23:48:43 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05c5854  MINOR: Add Scalafmt to Streams Scala API (#4965)
05c5854 is described below

commit 05c5854d1ff8510c867066b65e28ad9414d6c266
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Tue Jul 10 01:48:34 2018 +0200

    MINOR: Add Scalafmt to Streams Scala API (#4965)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 build.gradle                                       |   9 ++
 checkstyle/.scalafmt.conf                          |  20 +++
 .../kafka/streams/scala/FunctionConversions.scala  |   8 +-
 .../kafka/streams/scala/ImplicitConversions.scala  |   5 +-
 .../org/apache/kafka/streams/scala/Serdes.scala    |  41 ++---
 .../kafka/streams/scala/StreamsBuilder.scala       |  32 ++--
 .../streams/scala/kstream/KGroupedStream.scala     |   7 +-
 .../streams/scala/kstream/KGroupedTable.scala      |   2 +-
 .../kafka/streams/scala/kstream/KStream.scala      | 179 ++++++++++-----------
 .../kafka/streams/scala/kstream/KTable.scala       | 134 ++++++++-------
 .../scala/kstream/SessionWindowedKStream.scala     |   9 +-
 .../scala/kstream/TimeWindowedKStream.scala        |   6 +-
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  58 ++++---
 .../streams/scala/StreamToTableJoinTestData.scala  |   1 -
 .../apache/kafka/streams/scala/TopologyTest.scala  |  70 ++++----
 .../apache/kafka/streams/scala/WordCountTest.scala |  19 ++-
 16 files changed, 318 insertions(+), 282 deletions(-)

diff --git a/build.gradle b/build.gradle
index a0897e1..3b04e4b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,6 +29,15 @@ buildscript {
     classpath 'org.scoverage:gradle-scoverage:2.3.0'
     classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
     classpath 'org.owasp:dependency-check-gradle:3.2.1'
+    classpath "com.diffplug.spotless:spotless-plugin-gradle:3.10.0"
+  }
+}
+
+apply plugin: "com.diffplug.gradle.spotless"
+spotless {
+  scala {
+    target 'streams/**/*.scala'
+    scalafmt('1.5.1').configFile('checkstyle/.scalafmt.conf')
   }
 }
 
diff --git a/checkstyle/.scalafmt.conf b/checkstyle/.scalafmt.conf
new file mode 100644
index 0000000..057e3b9
--- /dev/null
+++ b/checkstyle/.scalafmt.conf
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+docstrings = JavaDoc
+maxColumn = 120
+continuationIndent.defnSite = 2
+assumeStandardLibraryStripMargin = true
+danglingParentheses = true
+rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers]
\ No newline at end of file
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 9ce9838..4a4c3b0 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -30,7 +30,7 @@ import java.lang.{Iterable => JIterable}
  * more expressive, with less boilerplate and more succinct.
  * <p>
  * For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
- * have full support for SAM types. 
+ * have full support for SAM types.
  */
 object FunctionConversions {
 
@@ -40,7 +40,7 @@ object FunctionConversions {
     }
   }
 
-  implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends AnyVal {
+  implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
     def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] {
       override def apply(key: T, value: U): VR = f(key, value)
     }
@@ -49,7 +49,7 @@ object FunctionConversions {
     }
   }
 
-  implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal {
+  implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
     def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] {
       override def apply(key: K, value: V): KeyValue[KR, VR] = {
         val (kr, vr) = f(key, value)
@@ -88,7 +88,7 @@ object FunctionConversions {
     }
   }
 
-  implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal {
+  implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
     def asMerger: Merger[K, VR] = new Merger[K, VR] {
       override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo)
     }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index 0c384a1..d1ff674 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -77,7 +77,8 @@ object ImplicitConversions {
                                                             valueSerde: Serde[V]): Materialized[K, V, S] =
     Materialized.`with`[K, V, S](keySerde, valueSerde)
 
-  implicit def joinedFromKeyValueOtherSerde[K, V, VO]
-    (implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] =
+  implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
+                                                      valueSerde: Serde[V],
+                                                      otherValueSerde: Serde[VO]): Joined[K, V, VO] =
     Joined.`with`(keySerde, valueSerde, otherValueSerde)
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
index a0ffffa..8bfb083 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -25,47 +25,48 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, S
 import org.apache.kafka.streams.kstream.WindowedSerdes
 
 object Serdes {
-  implicit val String: Serde[String]                             = JSerdes.String()
-  implicit val Long: Serde[Long]                                 = JSerdes.Long().asInstanceOf[Serde[Long]]
-  implicit val JavaLong: Serde[java.lang.Long]                   = JSerdes.Long()
-  implicit val ByteArray: Serde[Array[Byte]]                     = JSerdes.ByteArray()
+  implicit val String: Serde[String] = JSerdes.String()
+  implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
+  implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
   implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
-  implicit val Float: Serde[Float]                               = JSerdes.Float().asInstanceOf[Serde[Float]]
-  implicit val JavaFloat: Serde[java.lang.Float]                 = JSerdes.Float()
-  implicit val Double: Serde[Double]                             = JSerdes.Double().asInstanceOf[Serde[Double]]
-  implicit val JavaDouble: Serde[java.lang.Double]               = JSerdes.Double()
-  implicit val Integer: Serde[Int]                               = JSerdes.Integer().asInstanceOf[Serde[Int]]
-  implicit val JavaInteger: Serde[java.lang.Integer]             = JSerdes.Integer()
+  implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
+  implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
+  implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
 
   implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
-  implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
+  implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] =
+    new WindowedSerdes.SessionWindowedSerde[T]()
 
   def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] =
     JSerdes.serdeFrom(
       new Serializer[T] {
-        override def serialize(topic: String, data: T): Array[Byte]                = serializer(data)
+        override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
         override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
-        override def close(): Unit                                                 = ()
+        override def close(): Unit = ()
       },
       new Deserializer[T] {
-        override def deserialize(topic: String, data: Array[Byte]): T              = deserializer(data).orNull
+        override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
         override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
-        override def close(): Unit                                                 = ()
+        override def close(): Unit = ()
       }
     )
 
   def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
-    deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+                        deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
     JSerdes.serdeFrom(
       new Serializer[T] {
-        override def serialize(topic: String, data: T): Array[Byte]                = serializer(topic, data)
+        override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
         override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
-        override def close(): Unit                                                 = ()
+        override def close(): Unit = ()
       },
       new Deserializer[T] {
-        override def deserialize(topic: String, data: Array[Byte]): T              = deserializer(topic, data).orNull
+        override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
         override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
-        override def close(): Unit                                                 = ()
+        override def close(): Unit = ()
       }
     )
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index af342ac..fcec778 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -31,18 +31,18 @@ import ImplicitConversions._
 import scala.collection.JavaConverters._
 
 /**
-  * Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
-  */
+ * Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
+ */
 class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
 
   /**
    * Create a [[kstream.KStream]] from the specified topic.
    * <p>
-   * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, 
+   * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
    * key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
    * <p>
    * A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
-   * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. 
+   * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
    * {{{
    * // Brings all implicit conversions in scope
    * import ImplicitConversions._
@@ -88,11 +88,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
   /**
    * Create a [[kstream.KTable]] from the specified topic.
    * <p>
-   * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`, 
+   * The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
    * key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
    * <p>
    * A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
-   * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]]. 
+   * converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
    * {{{
    * // Brings all implicit conversions in scope
    * import ImplicitConversions._
@@ -123,8 +123,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    * @see #table(String)
    * @see `org.apache.kafka.streams.StreamsBuilder#table`
    */
-  def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
-    (implicit consumed: Consumed[K, V]): KTable[K, V] =
+  def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
+    implicit consumed: Consumed[K, V]
+  ): KTable[K, V] =
     inner.table[K, V](topic, consumed, materialized)
 
   /**
@@ -139,8 +140,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
     inner.globalTable(topic, consumed)
 
   /**
-   * Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized 
-   * in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers 
+   * Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
+   * in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
    * from the implicit `Consumed` instance will be used.
    *
    * @param topic the topic name
@@ -148,12 +149,13 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    * @return a `GlobalKTable` for the specified topic
    * @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
    */
-  def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
-    (implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
+  def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
+    implicit consumed: Consumed[K, V]
+  ): GlobalKTable[K, V] =
     inner.globalTable(topic, consumed, materialized)
 
   /**
-   * Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`, 
+   * Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
    * `Transformer`, or `ValueTransformer` before it can be used.
    *
    * @param builder the builder used to obtain this state store `StateStore` instance
@@ -164,11 +166,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
   def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)
 
   /**
-   * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`, 
+   * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
    * or `ValueTransformer` (in contrast to regular stores).
    *
    * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
-   */ 
+   */
   def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
                      topic: String,
                      consumed: Consumed[_, _],
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 0e5abfd..f6a22d9 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
-
 /**
  * Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
  *
@@ -41,7 +40,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view)
    * provided by the given `materialized`.
    *
-   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
@@ -55,8 +54,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
   /**
    * Combine the values of records in this stream by the grouped key.
    *
-   * @param reducer   a function `(V, V) => V` that computes a new aggregate result. 
-   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @param reducer   a function `(V, V) => V` that computes a new aggregate result.
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 99bc83e..76ea9ed 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -39,7 +39,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    * Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
    * the same key into a new instance of [[KTable]].
    *
-   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 0f1fc82..8f6aab8 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -46,7 +46,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param predicate a filter that is applied to each record
    * @return a [[KStream]] that contains only those records that satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filter`
-   */ 
+   */
   def filter(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filter(predicate.asPredicate)
 
@@ -57,7 +57,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param predicate a filter that is applied to each record
    * @return a [[KStream]] that contains only those records that do <em>not</em> satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KStream#filterNot`
-   */ 
+   */
   def filterNot(predicate: (K, V) => Boolean): KStream[K, V] =
     inner.filterNot(predicate.asPredicate)
 
@@ -70,7 +70,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param mapper a function `(K, V) => KR` that computes a new key for each record
    * @return a [[KStream]] that contains records with new key (possibly of different type) and unmodified value
    * @see `org.apache.kafka.streams.kstream.KStream#selectKey`
-   */ 
+   */
   def selectKey[KR](mapper: (K, V) => KR): KStream[KR, V] =
     inner.selectKey[KR](mapper.asKeyValueMapper)
 
@@ -83,7 +83,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param mapper a function `(K, V) => (KR, VR)` that computes a new output record
    * @return a [[KStream]] that contains records with new key and value (possibly both of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#map`
-   */ 
+   */
   def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] = {
     val kvMapper = mapper.tupled andThen tuple2ToKeyValue
     inner.map[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
@@ -97,7 +97,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param mapper, a function `V => VR` that computes a new output value
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
-   */ 
+   */
   def mapValues[VR](mapper: V => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper)
 
@@ -109,7 +109,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param mapper, a function `(K, V) => VR` that computes a new output value
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#mapValues`
-   */ 
+   */
   def mapValues[VR](mapper: (K, V) => VR): KStream[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
 
@@ -122,10 +122,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param mapper function `(K, V) => Iterable[(KR, VR)]` that computes the new output records
    * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#flatMap`
-   */ 
+   */
   def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR] = {
     val kvMapper = mapper.tupled andThen (iter => iter.map(tuple2ToKeyValue).asJava)
-    inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k , v)).asKeyValueMapper)
+    inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
   }
 
   /**
@@ -139,7 +139,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param mapper a function `V => Iterable[VR]` that computes the new output values
    * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
-   */ 
+   */
   def flatMapValues[VR](mapper: V => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapper)
 
@@ -154,7 +154,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param mapper a function `(K, V) => Iterable[VR]` that computes the new output values
    * @return a [[KStream]] that contains more or less records with unmodified keys and new values of different type
    * @see `org.apache.kafka.streams.kstream.KStream#flatMapValues`
-   */ 
+   */
   def flatMapValues[VR](mapper: (K, V) => Iterable[VR]): KStream[K, VR] =
     inner.flatMapValues[VR](mapper.asValueMapperWithKey)
 
@@ -187,7 +187,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
 
   /**
-   * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for 
+   * Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Produced` instance for
    * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`
    * <p>
    * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
@@ -219,7 +219,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.through(topic, produced)
 
   /**
-   * Materialize this stream to a topic using the `Produced` instance for 
+   * Materialize this stream to a topic using the `Produced` instance for
    * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`
    * <p>
    * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
@@ -250,34 +250,34 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.to(topic, produced)
 
   /**
-    * Dynamically materialize this stream to topics using the `Produced` instance for
-    * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`.
-    * The topic names for each record to send to is dynamically determined based on the given mapper.
-    * <p>
-    * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
-    * key and value serdes that will be converted to a `Produced` instance implicitly.
-    * <p>
-    * {{{
-    * Example:
-    *
-    * // brings implicit serdes in scope
-    * import Serdes._
-    *
-    * //..
-    * val clicksPerRegion: KTable[String, Long] = //..
-    *
-    * // Implicit serdes in scope will generate an implicit Produced instance, which
-    * // will be passed automatically to the call of through below
-    * clicksPerRegion.to(topicChooser)
-    *
-    * // Similarly you can create an implicit Produced and it will be passed implicitly
-    * // to the through call
-    * }}}
-    *
-    * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record
-    * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
-    * @see `org.apache.kafka.streams.kstream.KStream#to`
-    */
+   * Dynamically materialize this stream to topics using the `Produced` instance for
+   * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`.
+   * The topic names for each record to send to is dynamically determined based on the given mapper.
+   * <p>
+   * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit
+   * key and value serdes that will be converted to a `Produced` instance implicitly.
+   * <p>
+   * {{{
+   * Example:
+   *
+   * // brings implicit serdes in scope
+   * import Serdes._
+   *
+   * //..
+   * val clicksPerRegion: KTable[String, Long] = //..
+   *
+   * // Implicit serdes in scope will generate an implicit Produced instance, which
+   * // will be passed automatically to the call of through below
+   * clicksPerRegion.to(topicChooser)
+   *
+   * // Similarly you can create an implicit Produced and it will be passed implicitly
+   * // to the through call
+   * }}}
+   *
+   * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record
+   * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner`
+   * @see `org.apache.kafka.streams.kstream.KStream#to`
+   */
   def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit =
     inner.to(extractor, produced)
 
@@ -292,25 +292,23 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param stateStoreNames     the names of the state stores used by the processor
    * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
-   */ 
-  def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)],
-    stateStoreNames: String*): KStream[K1, V1] = {
-    val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] = new TransformerSupplier[K, V, KeyValue[K1, V1]] {
-      override def get(): Transformer[K, V, KeyValue[K1, V1]] = {
-        new Transformer[K, V, KeyValue[K1, V1]] {
-          override def transform(key: K, value: V): KeyValue[K1, V1] = {
-            transformer.transform(key, value) match {
-              case (k1, v1) => KeyValue.pair(k1, v1)
-              case _ => null
-            }
+   */
+  def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)], stateStoreNames: String*): KStream[K1, V1] = {
+    val transformerSupplierJ: TransformerSupplier[K, V, KeyValue[K1, V1]] =
+      new TransformerSupplier[K, V, KeyValue[K1, V1]] {
+        override def get(): Transformer[K, V, KeyValue[K1, V1]] =
+          new Transformer[K, V, KeyValue[K1, V1]] {
+            override def transform(key: K, value: V): KeyValue[K1, V1] =
+              transformer.transform(key, value) match {
+                case (k1, v1) => KeyValue.pair(k1, v1)
+                case _        => null
+              }
+
+            override def init(context: ProcessorContext): Unit = transformer.init(context)
+
+            override def close(): Unit = transformer.close()
           }
-
-          override def init(context: ProcessorContext): Unit = transformer.init(context)
-
-          override def close(): Unit = transformer.close()
-        }
       }
-    }
     inner.transform(transformerSupplierJ, stateStoreNames: _*)
   }
 
@@ -318,14 +316,14 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Transform the value of each input record into a new value (with possible new type) of the output record.
    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
    * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and registered 
-   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` 
+   * In order to assign a state, the state must be created and registered
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
    *
    * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
    * @param stateStoreNames          the names of the state stores used by the processor
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-   */ 
+   */
   def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
                           stateStoreNames: String*): KStream[K, VR] =
     inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
@@ -334,29 +332,28 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * Transform the value of each input record into a new value (with possible new type) of the output record.
    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
    * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and registered 
-   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` 
+   * In order to assign a state, the state must be created and registered
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
    *
    * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
    * @param stateStoreNames          the names of the state stores used by the processor
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-   */ 
+   */
   def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          stateStoreNames: String*): KStream[K, VR] = {
+                          stateStoreNames: String*): KStream[K, VR] =
     inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
-  }
 
   /**
    * Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
    * `processorSupplier`).
-   * In order to assign a state, the state must be created and registered 
-   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer` 
+   * In order to assign a state, the state must be created and registered
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
    *
    * @param processorSupplier a function that generates a [[org.apache.kafka.stream.Processor]]
    * @param stateStoreNames   the names of the state store used by the processor
    * @see `org.apache.kafka.streams.kstream.KStream#process`
-   */ 
+   */
   def process(processorSupplier: () => Processor[K, V], stateStoreNames: String*): Unit = {
     val processorSupplierJ: ProcessorSupplier[K, V] = new ProcessorSupplier[K, V] {
       override def get(): Processor[K, V] = processorSupplier()
@@ -365,7 +362,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   }
 
   /**
-   * Group the records by their current key into a [[KGroupedStream]] 
+   * Group the records by their current key into a [[KGroupedStream]]
    * <p>
    * The user can either supply the `Serialized` instance as an implicit in scope or she can also provide an implicit
    * serdes that will be converted to a `Serialized` instance implicitly.
@@ -390,10 +387,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * // to the groupByKey call
    * }}}
    *
-   * @param (implicit) serialized the instance of Serialized that gives the serdes 
+   * @param (implicit) serialized the instance of Serialized that gives the serdes
    * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#groupByKey`
-   */ 
+   */
   def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStream[K, V] =
     inner.groupByKey(serialized)
 
@@ -427,18 +424,18 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @param selector a function that computes a new key for grouping
    * @return a [[KGroupedStream]] that contains the grouped records of the original [[KStream]]
    * @see `org.apache.kafka.streams.kstream.KStream#groupBy`
-   */ 
+   */
   def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Serialized[KR, V]): KGroupedStream[KR, V] =
     inner.groupBy(selector.asKeyValueMapper, serialized)
 
   /**
-   * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with 
+   * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with
    * serializers and deserializers supplied by the implicit `Joined` instance.
    *
    * @param otherStream the [[KStream]] to be joined with this stream
    * @param joiner      a function that computes the join result for a pair of matching records
    * @param windows     the specification of the `JoinWindows`
-   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
    *                    inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
    *                    key serde, value serde and other value serde in implicit scope and they will be
    *                    converted to the instance of `Joined` through implicit conversion
@@ -453,17 +450,17 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
-   * Join records of this stream with another [[KTable]]'s records using inner equi join with 
+   * Join records of this stream with another [[KTable]]'s records using inner equi join with
    * serializers and deserializers supplied by the implicit `Joined` instance.
    *
    * @param table    the [[KTable]] to be joined with this stream
    * @param joiner   a function that computes the join result for a pair of matching records
-   * @param joined   an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   * @param joined   an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
    *                 inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
    *                 key serde, value serde and other value serde in implicit scope and they will be
    *                 converted to the instance of `Joined` through implicit conversion
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
-   * one for each matched record-pair with the same key 
+   * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KStream#join`
    */
   def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
@@ -479,7 +476,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#join`
-   */ 
+   */
   def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
     joiner: (V, GV) => RV
@@ -491,20 +488,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     )
 
   /**
-   * Join records of this stream with another [[KStream]]'s records using windowed left equi join with 
+   * Join records of this stream with another [[KStream]]'s records using windowed left equi join with
    * serializers and deserializers supplied by the implicit `Joined` instance.
    *
    * @param otherStream the [[KStream]] to be joined with this stream
    * @param joiner      a function that computes the join result for a pair of matching records
    * @param windows     the specification of the `JoinWindows`
-   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
    *                    inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
    *                    key serde, value serde and other value serde in implicit scope and they will be
    *                    converted to the instance of `Joined` through implicit conversion
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
    *                    one for each matched record-pair with the same key and within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
-   */ 
+   */
   def leftJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
     windows: JoinWindows
@@ -512,19 +509,19 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
 
   /**
-   * Join records of this stream with another [[KTable]]'s records using left equi join with 
+   * Join records of this stream with another [[KTable]]'s records using left equi join with
    * serializers and deserializers supplied by the implicit `Joined` instance.
    *
    * @param table    the [[KTable]] to be joined with this stream
    * @param joiner   a function that computes the join result for a pair of matching records
-   * @param joined   an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   * @param joined   an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
    *                 inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
    *                 key serde, value serde and other value serde in implicit scope and they will be
    *                 converted to the instance of `Joined` through implicit conversion
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
-   *                 one for each matched record-pair with the same key 
+   *                 one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
-   */ 
+   */
   def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
     inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
 
@@ -538,7 +535,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
    *                       one output for each input [[KStream]] record
    * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
-   */ 
+   */
   def leftJoin[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
     joiner: (V, GV) => RV
@@ -546,20 +543,20 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
     inner.leftJoin[GK, GV, RV](globalKTable, keyValueMapper.asKeyValueMapper, joiner.asValueJoiner)
 
   /**
-   * Join records of this stream with another [[KStream]]'s records using windowed outer equi join with 
+   * Join records of this stream with another [[KStream]]'s records using windowed outer equi join with
    * serializers and deserializers supplied by the implicit `Joined` instance.
    *
    * @param otherStream the [[KStream]] to be joined with this stream
    * @param joiner      a function that computes the join result for a pair of matching records
    * @param windows     the specification of the `JoinWindows`
-   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize 
+   * @param joined      an implicit `Joined` instance that defines the serdes to be used to serialize/deserialize
    *                    inputs and outputs of the joined streams. Instead of `Joined`, the user can also supply
    *                    key serde, value serde and other value serde in implicit scope and they will be
    *                    converted to the instance of `Joined` through implicit conversion
    * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
    * one for each matched record-pair with the same key and within the joining window intervals
    * @see `org.apache.kafka.streams.kstream.KStream#outerJoin`
-   */ 
+   */
   def outerJoin[VO, VR](otherStream: KStream[K, VO])(
     joiner: (V, VO) => VR,
     windows: JoinWindows
@@ -569,8 +566,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   /**
    * Merge this stream and the given stream into one larger stream.
    * <p>
-   * There is no ordering guarantee between records from this `KStream` and records from the provided `KStream` 
-   * in the merged stream. Relative order is preserved within each input stream though (ie, records within 
+   * There is no ordering guarantee between records from this `KStream` and records from the provided `KStream`
+   * in the merged stream. Relative order is preserved within each input stream though (ie, records within
    * one input stream are processed in order).
    *
    * @param stream a stream which is to be merged into this stream
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index cff1844..b669771 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -44,10 +44,9 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param predicate a filter that is applied to each record
    * @return a [[KTable]] that contains only those records that satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KTable#filter`
-   */ 
-  def filter(predicate: (K, V) => Boolean): KTable[K, V] = {
+   */
+  def filter(predicate: (K, V) => Boolean): KTable[K, V] =
     inner.filter(predicate(_, _))
-  }
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given
@@ -55,12 +54,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    *
    * @param predicate a filter that is applied to each record
    * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
-   *                      should be materialized. 
+   *                      should be materialized.
    * @return a [[KTable]] that contains only those records that satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KTable#filter`
-   */ 
-  def filter(predicate: (K, V) => Boolean,
-    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+   */
+  def filter(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
     inner.filter(predicate.asPredicate, materialized)
 
   /**
@@ -70,7 +68,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param predicate a filter that is applied to each record
    * @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
-   */ 
+   */
   def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
     inner.filterNot(predicate(_, _))
 
@@ -80,12 +78,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    *
    * @param predicate a filter that is applied to each record
    * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
-   *                      should be materialized. 
+   *                      should be materialized.
    * @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
-   */ 
-  def filterNot(predicate: (K, V) => Boolean,
-    materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+   */
+  def filterNot(predicate: (K, V) => Boolean, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
     inner.filterNot(predicate.asPredicate, materialized)
 
   /**
@@ -97,7 +94,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param mapper, a function `V => VR` that computes a new output value
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
-   */ 
+   */
   def mapValues[VR](mapper: V => VR): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper)
 
@@ -109,12 +106,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    *
    * @param mapper, a function `V => VR` that computes a new output value
    * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
-   *                      should be materialized. 
+   *                      should be materialized.
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
-   */ 
-  def mapValues[VR](mapper: V => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+   */
+  def mapValues[VR](mapper: V => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapper, materialized)
 
   /**
@@ -126,7 +122,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param mapper, a function `(K, V) => VR` that computes a new output value
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
-   */ 
+   */
   def mapValues[VR](mapper: (K, V) => VR): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
 
@@ -138,12 +134,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    *
    * @param mapper, a function `(K, V) => VR` that computes a new output value
    * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
-   *                      should be materialized. 
+   *                      should be materialized.
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
-   */ 
-  def mapValues[VR](mapper: (K, V) => VR,
-    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+   */
+  def mapValues[VR](mapper: (K, V) => VR, materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
     inner.mapValues[VR](mapper.asValueMapperWithKey)
 
   /**
@@ -165,57 +160,55 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
     inner.toStream[KR](mapper.asKeyValueMapper)
 
   /**
-    * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
-    * Transform the value of each input record into a new value (with possible new type) of the output record.
-    * A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input
-    * record value and computes a new value for it.
-    * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing access to additional state-stores,
-    * and to the `ProcessorContext`.
-    * If the downstream topology uses aggregation functions, (e.g. `KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
-    * care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct
-    * aggregate results.
-    * In contrast, if the resulting KTable is materialized, (cf. `#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
-    * such concerns are handled for you.
-    * In order to assign a state, the state must be created and registered
-    * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
-    *
-    * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
-    *                                 At least one transformer instance will be created per streaming task.
-    *                                 Transformer implementations doe not need to be thread-safe.
-    * @param stateStoreNames          the names of the state stores used by the processor
-    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
-    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-    */
+   * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
+   * Transform the value of each input record into a new value (with possible new type) of the output record.
+   * A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input
+   * record value and computes a new value for it.
+   * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing access to additional state-stores,
+   * and to the `ProcessorContext`.
+   * If the downstream topology uses aggregation functions, (e.g. `KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
+   * care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct
+   * aggregate results.
+   * In contrast, if the resulting KTable is materialized, (cf. `#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
+   * such concerns are handled for you.
+   * In order to assign a state, the state must be created and registered
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
+   *
+   * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
+   *                                 At least one transformer instance will be created per streaming task.
+   *                                 Transformer implementations doe not need to be thread-safe.
+   * @param stateStoreNames          the names of the state stores used by the processor
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+   */
   def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          stateStoreNames: String*): KTable[K, VR] = {
+                          stateStoreNames: String*): KTable[K, VR] =
     inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)
-  }
 
   /**
-    * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
-    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
-    * record value and computes a new value for it.
-    * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing stateful, rather than stateless,
-    * record-by-record operation, access to additional state-stores, and access to the `ProcessorContext`.
-    * In order to assign a state, the state must be created and registered
-    * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
-    * The resulting `KTable` is materialized into another state store (additional to the provided state store names)
-    * as specified by the user via `Materialized` parameter, and is queryable through its given name.
-    *
-    * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
-    *                                 At least one transformer instance will be created per streaming task.
-    *                                 Transformer implementations doe not need to be thread-safe.
-    * @param materialized             an instance of `Materialized` used to describe how the state store of the
-    *                                 resulting table should be materialized.
-    * @param stateStoreNames          the names of the state stores used by the processor
-    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
-    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-    */
+   * Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
+   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
+   * record value and computes a new value for it.
+   * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing stateful, rather than stateless,
+   * record-by-record operation, access to additional state-stores, and access to the `ProcessorContext`.
+   * In order to assign a state, the state must be created and registered
+   * beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
+   * The resulting `KTable` is materialized into another state store (additional to the provided state store names)
+   * as specified by the user via `Materialized` parameter, and is queryable through its given name.
+   *
+   * @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
+   *                                 At least one transformer instance will be created per streaming task.
+   *                                 Transformer implementations doe not need to be thread-safe.
+   * @param materialized             an instance of `Materialized` used to describe how the state store of the
+   *                                 resulting table should be materialized.
+   * @param stateStoreNames          the names of the state stores used by the processor
+   * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
+   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+   */
   def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
                           materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
-                          stateStoreNames: String*): KTable[K, VR] = {
+                          stateStoreNames: String*): KTable[K, VR] =
     inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*)
-  }
 
   /**
    * Re-groups the records of this [[KTable]] using the provided key/value mapper
@@ -247,7 +240,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param other  the other [[KTable]] to be joined with this [[KTable]]
    * @param joiner a function that computes the join result for a pair of matching records
    * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
-   *                      should be materialized. 
+   *                      should be materialized.
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#join`
@@ -276,7 +269,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param other  the other [[KTable]] to be joined with this [[KTable]]
    * @param joiner a function that computes the join result for a pair of matching records
    * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
-   *                      should be materialized. 
+   *                      should be materialized.
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
@@ -305,11 +298,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @param other  the other [[KTable]] to be joined with this [[KTable]]
    * @param joiner a function that computes the join result for a pair of matching records
    * @param materialized  a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
-   *                      should be materialized. 
+   *                      should be materialized.
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    * one for each matched record-pair with the same key
    * @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
-   */ 
+   */
   def outerJoin[VO, VR](other: KTable[K, VO])(
     joiner: (V, VO) => VR,
     materialized: Materialized[K, VR, ByteArrayKeyValueStore]
@@ -323,4 +316,3 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    */
   def queryableStoreName: String = inner.queryableStoreName
 }
-
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index ed41973..a602767 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -46,8 +46,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
-                                        merger: (K, VR, VR) => VR)(
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(
     implicit materialized: Materialized[K, VR, ByteArraySessionStore]
   ): KTable[Windowed[K], VR] =
     inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
@@ -55,7 +54,7 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
   /**
    * Count the number of records in this stream by the grouped key into `SessionWindows`.
    *
-   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a windowed [[KTable]] that contains "update" records with unmodified keys and `Long` values
    * that represent the latest (rolling) count (i.e., number of records) for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
@@ -69,8 +68,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
   /**
    * Combine values of this stream by the grouped key into {@link SessionWindows}.
    *
-   * @param reducer           a reducer function that computes a new aggregate result. 
-   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @param reducer           a reducer function that computes a new aggregate result.
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a windowed [[KTable]] that contains "update" records with unmodified keys, and values that represent
    * the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 9e31ab9..9be5794 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -53,11 +53,11 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
   /**
    * Count the number of records in this stream by the grouped key and the defined windows.
    *
-   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
    * represent the latest (rolling) count (i.e., number of records) for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
-   */ 
+   */
   def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K], Long] = {
     val c: KTable[Windowed[K], java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
@@ -68,7 +68,7 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    * Combine the values of records in this stream by the grouped key.
    *
    * @param reducer   a function that computes a new aggregate result
-   * @param materialized  an instance of `Materialized` used to materialize a state store. 
+   * @param materialized  an instance of `Materialized` used to materialize a state store.
    * @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
    * latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 02d1dab..7891131 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -40,9 +40,8 @@ import org.scalatest.junit.JUnitSuite
  * <p>
  * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
  * Hence the native Java API based version is more verbose.
- */ 
-class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
-  with StreamToTableJoinTestData {
+ */
+class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite with StreamToTableJoinTestData {
 
   private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
 
@@ -67,7 +66,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
 
   @Test def testShouldCountClicksPerRegion(): Unit = {
 
-    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, 
+    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced,
     // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will
     // get these instances automatically
     import Serdes._
@@ -84,7 +83,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
     val clicksPerRegion: KTable[String, Long] =
       userClicksStream
 
-        // Join the stream against the table.
+      // Join the stream against the table.
         .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
 
         // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
@@ -100,8 +99,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
     val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
     streams.start()
 
-
-    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = 
+    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
 
     streams.close()
@@ -126,29 +124,32 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
 
     val builder: StreamsBuilderJ = new StreamsBuilderJ()
 
-    val userClicksStream: KStreamJ[String, JLong] = 
+    val userClicksStream: KStreamJ[String, JLong] =
       builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.String, Serdes.JavaLong))
 
-    val userRegionsTable: KTableJ[String, String] = 
+    val userRegionsTable: KTableJ[String, String] =
       builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.String, Serdes.String))
 
     // Join the stream against the table.
     val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
-      .leftJoin(userRegionsTable, 
+      .leftJoin(
+        userRegionsTable,
         new ValueJoiner[JLong, String, (String, JLong)] {
-          def apply(clicks: JLong, region: String): (String, JLong) = 
+          def apply(clicks: JLong, region: String): (String, JLong) =
             (if (region == null) "UNKNOWN" else region, clicks)
-        }, 
-        Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
+        },
+        Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)
+      )
 
     // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
-    val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
-      .map { 
+    val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion
+      .map {
         new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] {
-          def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
+          def apply(k: String, regionWithClicks: (String, JLong)) =
+            new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
         }
       }
-        
+
     // Compute the total per region by summing the individual click counts per region.
     val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
       .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
@@ -157,7 +158,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
           def apply(v1: JLong, v2: JLong) = v1 + v2
         }
       }
-        
+
     // Write the (continuously updating) results to the output topic.
     clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.String, Serdes.JavaLong))
 
@@ -165,7 +166,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
 
     streams.start()
 
-    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = 
+    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
 
     streams.close()
@@ -214,17 +215,27 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
     p
   }
 
-  private def produceNConsume(userClicksTopic: String, userRegionsTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {
+  private def produceNConsume(userClicksTopic: String,
+                              userRegionsTopic: String,
+                              outputTopic: String): java.util.List[KeyValue[String, Long]] = {
 
     import collection.JavaConverters._
-    
+
     // Publish user-region information.
     val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, userRegions.asJava, userRegionsProducerConfig, mockTime, false)
+    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
+                                                       userRegions.asJava,
+                                                       userRegionsProducerConfig,
+                                                       mockTime,
+                                                       false)
 
     // Publish user-click information.
     val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, userClicks.asJava, userClicksProducerConfig, mockTime, false)
+    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
+                                                       userClicks.asJava,
+                                                       userClicksProducerConfig,
+                                                       mockTime,
+                                                       false)
 
     // consume and verify result
     val consumerConfig = getConsumerConfig()
@@ -232,4 +243,3 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
     IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size)
   }
 }
-
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
index 45715a7..e9040ee 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
@@ -58,4 +58,3 @@ trait StreamToTableJoinTestData {
     new KeyValue("asia", 124L)
   )
 }
-
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index ffae666..f04ec5d 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -35,7 +35,7 @@ import collection.JavaConverters._
 
 /**
  * Test suite that verifies that the topology built by the Java and Scala APIs match.
- */ 
+ */
 class TopologyTest extends JUnitSuite {
 
   val inputTopic = "input-topic"
@@ -50,22 +50,22 @@ class TopologyTest extends JUnitSuite {
     def getTopologyScala(): TopologyDescription = {
 
       import Serdes._
-  
+
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
-  
+
       val _: KStream[String, String] =
         textLines.flatMapValues(v => pattern.split(v.toLowerCase))
-  
+
       streamBuilder.build().describe()
     }
-  
+
     // build the Java topology
     def getTopologyJava(): TopologyDescription = {
 
       val streamBuilder = new StreamsBuilderJ
       val textLines = streamBuilder.stream[String, String](inputTopic)
-  
+
       val _: KStreamJ[String, String] = textLines.flatMapValues {
         new ValueMapper[String, java.lang.Iterable[String]] {
           def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
@@ -84,15 +84,16 @@ class TopologyTest extends JUnitSuite {
     def getTopologyScala(): TopologyDescription = {
 
       import Serdes._
-  
+
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
-  
+
       val _: KTable[String, Long] =
-        textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+        textLines
+          .flatMapValues(v => pattern.split(v.toLowerCase))
           .groupBy((k, v) => v)
           .count()
-  
+
       streamBuilder.build().describe()
     }
 
@@ -101,21 +102,21 @@ class TopologyTest extends JUnitSuite {
 
       val streamBuilder = new StreamsBuilderJ
       val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
-  
+
       val splits: KStreamJ[String, String] = textLines.flatMapValues {
         new ValueMapper[String, java.lang.Iterable[String]] {
           def apply(s: String): java.lang.Iterable[String] = pattern.split(s.toLowerCase).toIterable.asJava
         }
       }
-  
+
       val grouped: KGroupedStreamJ[String, String] = splits.groupBy {
         new KeyValueMapper[String, String, String] {
           def apply(k: String, v: String): String = v
         }
       }
-  
+
       val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
-  
+
       streamBuilder.build().describe()
     }
 
@@ -128,13 +129,13 @@ class TopologyTest extends JUnitSuite {
     // build the Scala topology
     def getTopologyScala(): TopologyDescription = {
       import Serdes._
-  
+
       val builder = new StreamsBuilder()
-  
+
       val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
-  
+
       val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic)
-  
+
       val clicksPerRegion: KTable[String, Long] =
         userClicksStream
           .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
@@ -149,32 +150,35 @@ class TopologyTest extends JUnitSuite {
     def getTopologyJava(): TopologyDescription = {
 
       import java.lang.{Long => JLong}
-  
+
       val builder: StreamsBuilderJ = new StreamsBuilderJ()
-  
-      val userClicksStream: KStreamJ[String, JLong] = 
+
+      val userClicksStream: KStreamJ[String, JLong] =
         builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String, Serdes.JavaLong))
-  
-      val userRegionsTable: KTableJ[String, String] = 
+
+      val userRegionsTable: KTableJ[String, String] =
         builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String, Serdes.String))
-  
+
       // Join the stream against the table.
       val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
-        .leftJoin(userRegionsTable, 
+        .leftJoin(
+          userRegionsTable,
           new ValueJoiner[JLong, String, (String, JLong)] {
-            def apply(clicks: JLong, region: String): (String, JLong) = 
+            def apply(clicks: JLong, region: String): (String, JLong) =
               (if (region == null) "UNKNOWN" else region, clicks)
-          }, 
-          Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String))
-  
+          },
+          Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)
+        )
+
       // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
-      val clicksByRegion : KStreamJ[String, JLong] = userClicksJoinRegion
-        .map { 
+      val clicksByRegion: KStreamJ[String, JLong] = userClicksJoinRegion
+        .map {
           new KeyValueMapper[String, (String, JLong), KeyValue[String, JLong]] {
-            def apply(k: String, regionWithClicks: (String, JLong)) = new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
+            def apply(k: String, regionWithClicks: (String, JLong)) =
+              new KeyValue[String, JLong](regionWithClicks._1, regionWithClicks._2)
           }
         }
-          
+
       // Compute the total per region by summing the individual click counts per region.
       val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
         .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 5abc1bc..5d858d8 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -50,7 +50,7 @@ import ImplicitConversions._
  * <p>
  * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
  * Hence the native Java API based version is more verbose.
- */ 
+ */
 class WordCountTest extends JUnitSuite with WordCountTestData {
 
   private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
@@ -61,11 +61,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
   val mockTime: MockTime = cluster.time
   mockTime.setCurrentTimeMs(alignedTime)
 
-
   val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
   @Rule def testFolder: TemporaryFolder = tFolder
-    
-
   @Before
   def startKafkaCluster(): Unit = {
     cluster.createTopic(inputTopic)
@@ -86,7 +83,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
 
     // generate word counts
     val wordCounts: KTable[String, Long] =
-      textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+      textLines
+        .flatMapValues(v => pattern.split(v.toLowerCase))
         .groupBy((_, v) => v)
         .count()
 
@@ -117,7 +115,8 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
 
     // generate word counts
     val wordCounts: KTable[String, Long] =
-      textLines.flatMapValues(v => pattern.split(v.toLowerCase))
+      textLines
+        .flatMapValues(v => pattern.split(v.toLowerCase))
         .groupBy((k, v) => v)
         .count()(Materialized.as("word-count"))
 
@@ -139,7 +138,12 @@ class WordCountTest extends JUnitSuite with WordCountTestData {
   @Test def testShouldCountWordsJava(): Unit = {
 
     import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
-    import org.apache.kafka.streams.kstream.{KTable => KTableJ, KStream => KStreamJ, KGroupedStream => KGroupedStreamJ, _}
+    import org.apache.kafka.streams.kstream.{
+      KTable => KTableJ,
+      KStream => KStreamJ,
+      KGroupedStream => KGroupedStreamJ,
+      _
+    }
     import collection.JavaConverters._
 
     val streamsConfiguration = getStreamsConfiguration()
@@ -250,4 +254,3 @@ trait WordCountTestData {
     new KeyValue("слова", 1L)
   )
 }
-


Mime
View raw message