kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cado...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12862: Update Scala fmt library and apply fixes (#10784)
Date Mon, 09 Aug 2021 10:07:40 GMT
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83f0ae3  KAFKA-12862: Update Scala fmt library and apply fixes (#10784)
83f0ae3 is described below

commit 83f0ae3821033d6c564c6df0d55c06e8642bf231
Author: Josep Prat <josep.prat@aiven.io>
AuthorDate: Mon Aug 9 12:05:31 2021 +0200

    KAFKA-12862: Update Scala fmt library and apply fixes (#10784)
    
    Updates the scala fmt to the latest stable version.
    Applies all the style fixes (all source code changes are done by scala
    fmt).
    Removes setting about dangling parentheses as `true` is already the
    default.
    
    Reviewer: John Roesler <john@confluent.io>
---
 checkstyle/.scalafmt.conf                          |   1 -
 gradle/dependencies.gradle                         |   2 +-
 .../kafka/streams/scala/ImplicitConversions.scala  |  22 +++--
 .../org/apache/kafka/streams/scala/Serdes.scala    |   6 +-
 .../kafka/streams/scala/StreamsBuilder.scala       |  18 ++--
 .../streams/scala/kstream/BranchedKStream.scala    |   4 +-
 .../streams/scala/kstream/CogroupedKStream.scala   |  14 +--
 .../kafka/streams/scala/kstream/Consumed.scala     |  10 +-
 .../kafka/streams/scala/kstream/Joined.scala       |   8 +-
 .../streams/scala/kstream/KGroupedStream.scala     |  13 +--
 .../streams/scala/kstream/KGroupedTable.scala      |  29 +++---
 .../kafka/streams/scala/kstream/KStream.scala      | 100 ++++++++++++-------
 .../kafka/streams/scala/kstream/KTable.scala       | 108 +++++++++++++--------
 .../kafka/streams/scala/kstream/Materialized.scala |  15 +--
 .../kafka/streams/scala/kstream/Produced.scala     |   5 +-
 .../streams/scala/kstream/Repartitioned.scala      |   5 +-
 .../kstream/SessionWindowedCogroupedKStream.scala  |   8 +-
 .../scala/kstream/SessionWindowedKStream.scala     |  16 +--
 .../kafka/streams/scala/kstream/StreamJoined.scala |   8 +-
 .../kstream/TimeWindowedCogroupedKStream.scala     |   8 +-
 .../scala/kstream/TimeWindowedKStream.scala        |  16 +--
 .../kafka/streams/scala/serialization/Serdes.scala |  12 ++-
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |   4 +-
 .../apache/kafka/streams/scala/TopologyTest.scala  |  50 +++++-----
 ...StreamToTableJoinScalaIntegrationTestBase.scala |  42 ++++----
 25 files changed, 309 insertions(+), 215 deletions(-)

diff --git a/checkstyle/.scalafmt.conf b/checkstyle/.scalafmt.conf
index 057e3b9..4c4fcf3 100644
--- a/checkstyle/.scalafmt.conf
+++ b/checkstyle/.scalafmt.conf
@@ -16,5 +16,4 @@ docstrings = JavaDoc
 maxColumn = 120
 continuationIndent.defnSite = 2
 assumeStandardLibraryStripMargin = true
-danglingParentheses = true
 rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers]
\ No newline at end of file
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index eeeecaf..7dc9ed5 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -106,7 +106,7 @@ versions += [
   reflections: "0.9.12",
   rocksDB: "6.19.3",
   scalaCollectionCompat: "2.4.4",
-  scalafmt: "1.5.1",
+  scalafmt: "2.7.5",
   scalaJava8Compat : "1.0.0",
   scoverage: "1.4.1",
   slf4j: "1.7.30",
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index ab8f319..5f7064b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -80,13 +80,17 @@ object ImplicitConversions {
   implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] =
     Grouped.`with`[K, V]
 
-  implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
-                                                      valueSerde: Serde[V],
-                                                      otherValueSerde: Serde[VO]): Joined[K, V, VO] =
+  implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit
+    keySerde: Serde[K],
+    valueSerde: Serde[V],
+    otherValueSerde: Serde[VO]
+  ): Joined[K, V, VO] =
     Joined.`with`[K, V, VO]
 
-  implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],
-                                                            valueSerde: Serde[V]): Materialized[K, V, S] =
+  implicit def materializedFromSerde[K, V, S <: StateStore](implicit
+    keySerde: Serde[K],
+    valueSerde: Serde[V]
+  ): Materialized[K, V, S] =
     Materialized.`with`[K, V, S]
 
   implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
@@ -95,8 +99,10 @@ object ImplicitConversions {
   implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Repartitioned[K, V] =
     Repartitioned.`with`[K, V]
 
-  implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
-                                                          valueSerde: Serde[V],
-                                                          otherValueSerde: Serde[VO]): StreamJoined[K, V, VO] =
+  implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit
+    keySerde: Serde[K],
+    valueSerde: Serde[V],
+    otherValueSerde: Serde[VO]
+  ): StreamJoined[K, V, VO] =
     StreamJoined.`with`[K, V, VO]
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
index a36fef8..2e42090 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -58,8 +58,10 @@ object Serdes {
       }
     )
 
-  def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
-                        deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+  def fromFn[T >: Null](
+    serializer: (String, T) => Array[Byte],
+    deserializer: (String, Array[Byte]) => Option[T]
+  ): Serde[T] =
     JSerdes.serdeFrom(
       new Serializer[T] {
         override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 3eeffc4..9430a51 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -120,8 +120,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    * @see #table(String)
    * @see `org.apache.kafka.streams.StreamsBuilder#table`
    */
-  def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
-    implicit consumed: Consumed[K, V]
+  def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
+    consumed: Consumed[K, V]
   ): KTable[K, V] =
     new KTable(inner.table[K, V](topic, consumed, materialized))
 
@@ -146,8 +146,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    * @return a `GlobalKTable` for the specified topic
    * @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
    */
-  def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
-    implicit consumed: Consumed[K, V]
+  def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
+    consumed: Consumed[K, V]
   ): GlobalKTable[K, V] =
     inner.globalTable(topic, consumed, materialized)
 
@@ -177,10 +177,12 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
     "Use #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
     "2.7.0"
   )
-  def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
-                           topic: String,
-                           consumed: Consumed[K, V],
-                           stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
+  def addGlobalStore[K, V](
+    storeBuilder: StoreBuilder[_ <: StateStore],
+    topic: String,
+    consumed: Consumed[K, V],
+    stateUpdateSupplier: ProcessorSupplier[K, V]
+  ): StreamsBuilderJ =
     inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
index 102c257..c606c00 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
@@ -110,7 +110,7 @@ class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
   def noDefaultBranch(): Map[String, KStream[K, V]] = toScalaMap(inner.noDefaultBranch())
 
   private def toScalaMap(m: util.Map[String, kstream.KStream[K, V]]): collection.immutable.Map[String, KStream[K, V]] =
-    m.asScala.map {
-      case (name, kStreamJ) => (name, new KStream(kStreamJ))
+    m.asScala.map { case (name, kStreamJ) =>
+      (name, new KStream(kStreamJ))
     }.toMap
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
index f4fe9fc..2bf58ca 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
@@ -43,8 +43,10 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
    * @param aggregator    a function that computes a new aggregate result
    * @return a [[CogroupedKStream]]
    */
-  def cogroup[VIn](groupedStream: KGroupedStream[KIn, VIn],
-                   aggregator: (KIn, VIn, VOut) => VOut): CogroupedKStream[KIn, VOut] =
+  def cogroup[VIn](
+    groupedStream: KGroupedStream[KIn, VIn],
+    aggregator: (KIn, VIn, VOut) => VOut
+  ): CogroupedKStream[KIn, VOut] =
     new CogroupedKStream(inner.cogroup(groupedStream.inner, aggregator.asAggregator))
 
   /**
@@ -58,8 +60,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
    *         (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
    */
-  def aggregate(initializer: => VOut)(
-    implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
+  def aggregate(initializer: => VOut)(implicit
+    materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
   ): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, materialized))
 
   /**
@@ -74,8 +76,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
    *         (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
    */
-  def aggregate(initializer: => VOut, named: Named)(
-    implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
+  def aggregate(initializer: => VOut, named: Named)(implicit
+    materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
   ): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized))
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
index a105ed6..714df97 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
@@ -61,8 +61,9 @@ object Consumed {
    * @tparam V                 value type
    * @return a new instance of [[Consumed]]
    */
-  def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde: Serde[K],
-                                                           valueSerde: Serde[V]): ConsumedJ[K, V] =
+  def `with`[K, V](
+    timestampExtractor: TimestampExtractor
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
     ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)
 
   /**
@@ -73,7 +74,8 @@ object Consumed {
    * @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
    * @return a new instance of [[Consumed]]
    */
-  def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde: Serde[K],
-                                                          valueSerde: Serde[V]): ConsumedJ[K, V] =
+  def `with`[K, V](
+    resetPolicy: Topology.AutoOffsetReset
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
     ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
index b6dbb05..c614e14 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
@@ -34,9 +34,11 @@ object Joined {
    * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
    * @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
    */
-  def `with`[K, V, VO](implicit keySerde: Serde[K],
-                       valueSerde: Serde[V],
-                       otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
+  def `with`[K, V, VO](implicit
+    keySerde: Serde[K],
+    valueSerde: Serde[V],
+    otherValueSerde: Serde[VO]
+  ): JoinedJ[K, V, VO] =
     JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 44a3e56..60a9c57 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -111,8 +111,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
    */
-  def reduce(reducer: (V, V) => V,
-             named: Named)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+  def reduce(reducer: (V, V) => V, named: Named)(implicit
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]
+  ): KTable[K, V] =
     new KTable(inner.reduce(reducer.asReducer, materialized))
 
   /**
@@ -125,8 +126,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
   ): KTable[K, VR] =
     new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized))
 
@@ -141,8 +142,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
   ): KTable[K, VR] =
     new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized))
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 292155d..3d9e052 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -76,8 +76,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
    */
-  def reduce(adder: (V, V) => V,
-             subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+  def reduce(adder: (V, V) => V, subtractor: (V, V) => V)(implicit
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]
+  ): KTable[K, V] =
     new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, materialized))
 
   /**
@@ -92,8 +93,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
    */
-  def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(
-    implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]
+  def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(implicit
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]
   ): KTable[K, V] =
     new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, named, materialized))
 
@@ -109,8 +110,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
   ): KTable[K, VR] =
     new KTable(
       inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
@@ -129,14 +130,16 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
    */
-  def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
   ): KTable[K, VR] =
     new KTable(
-      inner.aggregate((() => initializer).asInitializer,
-                      adder.asAggregator,
-                      subtractor.asAggregator,
-                      named,
-                      materialized)
+      inner.aggregate(
+        (() => initializer).asInitializer,
+        adder.asAggregator,
+        subtractor.asAggregator,
+        named,
+        materialized
+      )
     )
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index d097b85..dedb424 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -558,8 +558,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
    */
-  def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
-                        stateStoreNames: String*): KStream[K1, V1] =
+  def transform[K1, V1](
+    transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
+    stateStoreNames: String*
+  ): KStream[K1, V1] =
     new KStream(inner.transform(transformerSupplier, stateStoreNames: _*))
 
   /**
@@ -578,9 +580,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
    */
-  def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
-                        named: Named,
-                        stateStoreNames: String*): KStream[K1, V1] =
+  def transform[K1, V1](
+    transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
+    named: Named,
+    stateStoreNames: String*
+  ): KStream[K1, V1] =
     new KStream(inner.transform(transformerSupplier, named, stateStoreNames: _*))
 
   /**
@@ -598,8 +602,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
    */
-  def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
-                            stateStoreNames: String*): KStream[K1, V1] =
+  def flatTransform[K1, V1](
+    transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
+    stateStoreNames: String*
+  ): KStream[K1, V1] =
     new KStream(inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*))
 
   /**
@@ -618,9 +624,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transform`
    */
-  def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
-                            named: Named,
-                            stateStoreNames: String*): KStream[K1, V1] =
+  def flatTransform[K1, V1](
+    transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
+    named: Named,
+    stateStoreNames: String*
+  ): KStream[K1, V1] =
     new KStream(inner.flatTransform(transformerSupplier.asJava, named, stateStoreNames: _*))
 
   /**
@@ -638,8 +646,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
-                              stateStoreNames: String*): KStream[K, VR] =
+  def flatTransformValues[VR](
+    valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*))
 
   /**
@@ -658,9 +668,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
-                              named: Named,
-                              stateStoreNames: String*): KStream[K, VR] =
+  def flatTransformValues[VR](
+    valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
+    named: Named,
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*))
 
   /**
@@ -678,8 +690,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
-                              stateStoreNames: String*): KStream[K, VR] =
+  def flatTransformValues[VR](
+    valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*))
 
   /**
@@ -698,9 +712,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
-                              named: Named,
-                              stateStoreNames: String*): KStream[K, VR] =
+  def flatTransformValues[VR](
+    valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
+    named: Named,
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*))
 
   /**
@@ -717,8 +733,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
-                          stateStoreNames: String*): KStream[K, VR] =
+  def transformValues[VR](
+    valueTransformerSupplier: ValueTransformerSupplier[V, VR],
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*))
 
   /**
@@ -736,9 +754,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
-                          named: Named,
-                          stateStoreNames: String*): KStream[K, VR] =
+  def transformValues[VR](
+    valueTransformerSupplier: ValueTransformerSupplier[V, VR],
+    named: Named,
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*))
 
   /**
@@ -755,8 +775,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          stateStoreNames: String*): KStream[K, VR] =
+  def transformValues[VR](
+    valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*))
 
   /**
@@ -774,9 +796,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          named: Named,
-                          stateStoreNames: String*): KStream[K, VR] =
+  def transformValues[VR](
+    valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
+    named: Named,
+    stateStoreNames: String*
+  ): KStream[K, VR] =
     new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*))
 
   /**
@@ -792,8 +816,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#process`
    */
   @deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
-  def process(processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
-              stateStoreNames: String*): Unit = {
+  def process(
+    processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
+    stateStoreNames: String*
+  ): Unit = {
     val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier()
     inner.process(processorSupplierJ, stateStoreNames: _*)
   }
@@ -830,9 +856,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#process`
    */
   @deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
-  def process(processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
-              named: Named,
-              stateStoreNames: String*): Unit = {
+  def process(
+    processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
+    named: Named,
+    stateStoreNames: String*
+  ): Unit = {
     val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier()
     inner.process(processorSupplierJ, named, stateStoreNames: _*)
   }
@@ -1039,7 +1067,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    */
   def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
     keyValueMapper: (K, V) => GK,
-    joiner: (V, GV) => RV,
+    joiner: (V, GV) => RV
   ): KStream[K, RV] =
     new KStream(
       inner.join[GK, GV, RV](
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 7aba69d..892f39e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -86,9 +86,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains only those records that satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KTable#filter`
    */
-  def filter(predicate: (K, V) => Boolean,
-             named: Named,
-             materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+  def filter(
+    predicate: (K, V) => Boolean,
+    named: Named,
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]
+  ): KTable[K, V] =
     new KTable(inner.filter(predicate.asPredicate, named, materialized))
 
   /**
@@ -138,9 +140,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains only those records that do <em>not</em> satisfy the given predicate
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
    */
-  def filterNot(predicate: (K, V) => Boolean,
-                named: Named,
-                materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+  def filterNot(
+    predicate: (K, V) => Boolean,
+    named: Named,
+    materialized: Materialized[K, V, ByteArrayKeyValueStore]
+  ): KTable[K, V] =
     new KTable(inner.filterNot(predicate.asPredicate, named, materialized))
 
   /**
@@ -198,9 +202,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
    */
-  def mapValues[VR](mapper: V => VR,
-                    named: Named,
-                    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+  def mapValues[VR](
+    mapper: V => VR,
+    named: Named,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     new KTable(inner.mapValues[VR](mapper.asValueMapper, named, materialized))
 
   /**
@@ -258,9 +264,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KTable#mapValues`
    */
-  def mapValues[VR](mapper: (K, V) => VR,
-                    named: Named,
-                    materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+  def mapValues[VR](
+    mapper: (K, V) => VR,
+    named: Named,
+    materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  ): KTable[K, VR] =
     new KTable(inner.mapValues[VR](mapper.asValueMapperWithKey, named, materialized))
 
   /**
@@ -337,8 +345,10 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          stateStoreNames: String*): KTable[K, VR] =
+  def transformValues[VR](
+    valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+    stateStoreNames: String*
+  ): KTable[K, VR] =
     new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*))
 
   /**
@@ -364,9 +374,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          named: Named,
-                          stateStoreNames: String*): KTable[K, VR] =
+  def transformValues[VR](
+    valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+    named: Named,
+    stateStoreNames: String*
+  ): KTable[K, VR] =
     new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, named, stateStoreNames: _*))
 
   /**
@@ -389,9 +401,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
-                          stateStoreNames: String*): KTable[K, VR] =
+  def transformValues[VR](
+    valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
+    stateStoreNames: String*
+  ): KTable[K, VR] =
     new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*))
 
   /**
@@ -415,10 +429,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
-  def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
-                          materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
-                          named: Named,
-                          stateStoreNames: String*): KTable[K, VR] =
+  def transformValues[VR](
+    valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
+    named: Named,
+    stateStoreNames: String*
+  ): KTable[K, VR] =
     new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, named, stateStoreNames: _*))
 
   /**
@@ -619,10 +635,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    *         one for each matched record-pair with the same key
    */
-  def join[VR, KO, VO](other: KTable[KO, VO],
-                       keyExtractor: Function[V, KO],
-                       joiner: ValueJoiner[V, VO, VR],
-                       materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+  def join[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: Function[V, KO],
+    joiner: ValueJoiner[V, VO, VR],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
     new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, materialized))
 
   /**
@@ -638,11 +656,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    *         one for each matched record-pair with the same key
    */
-  def join[VR, KO, VO](other: KTable[KO, VO],
-                       keyExtractor: Function[V, KO],
-                       joiner: ValueJoiner[V, VO, VR],
-                       named: Named,
-                       materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+  def join[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: Function[V, KO],
+    joiner: ValueJoiner[V, VO, VR],
+    named: Named,
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
     new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
 
   /**
@@ -657,10 +677,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    *         one for each matched record-pair with the same key
    */
-  def leftJoin[VR, KO, VO](other: KTable[KO, VO],
-                           keyExtractor: Function[V, KO],
-                           joiner: ValueJoiner[V, VO, VR],
-                           materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+  def leftJoin[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: Function[V, KO],
+    joiner: ValueJoiner[V, VO, VR],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
     new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, materialized))
 
   /**
@@ -676,11 +698,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
    *         one for each matched record-pair with the same key
    */
-  def leftJoin[VR, KO, VO](other: KTable[KO, VO],
-                           keyExtractor: Function[V, KO],
-                           joiner: ValueJoiner[V, VO, VR],
-                           named: Named,
-                           materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+  def leftJoin[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: Function[V, KO],
+    joiner: ValueJoiner[V, VO, VR],
+    named: Named,
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
     new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
index eb126f0..421ac5a 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
@@ -50,8 +50,9 @@ object Materialized {
    * @param valueSerde the value serde to use.
    * @return a new [[Materialized]] instance with the given storeName
    */
-  def as[K, V, S <: StateStore](storeName: String)(implicit keySerde: Serde[K],
-                                                   valueSerde: Serde[V]): MaterializedJ[K, V, S] =
+  def as[K, V, S <: StateStore](
+    storeName: String
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, S] =
     MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)
 
   /**
@@ -68,8 +69,9 @@ object Materialized {
    * @param valueSerde the value serde to use.
    * @return a new [[Materialized]] instance with the given supplier
    */
-  def as[K, V](supplier: WindowBytesStoreSupplier)(implicit keySerde: Serde[K],
-                                                   valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] =
+  def as[K, V](
+    supplier: WindowBytesStoreSupplier
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] =
     MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
 
   /**
@@ -86,8 +88,9 @@ object Materialized {
    * @param valueSerde the value serde to use.
    * @return a new [[Materialized]] instance with the given supplier
    */
-  def as[K, V](supplier: SessionBytesStoreSupplier)(implicit keySerde: Serde[K],
-                                                    valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] =
+  def as[K, V](
+    supplier: SessionBytesStoreSupplier
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] =
     MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
index 351e0a5..48f9178 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
@@ -53,7 +53,8 @@ object Produced {
    * @see KStream#through(String, Produced)
    * @see KStream#to(String, Produced)
    */
-  def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K],
-                                                         valueSerde: Serde[V]): ProducedJ[K, V] =
+  def `with`[K, V](
+    partitioner: StreamPartitioner[K, V]
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
     ProducedJ.`with`(keySerde, valueSerde, partitioner)
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
index 06a0f4f..5f33efa 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
@@ -65,8 +65,9 @@ object Repartitioned {
    * @return A new [[Repartitioned]] instance configured with keySerde, valueSerde, and partitioner
    * @see KStream#repartition(Repartitioned)
    */
-  def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K],
-                                                         valueSerde: Serde[V]): RepartitionedJ[K, V] =
+  def `with`[K, V](
+    partitioner: StreamPartitioner[K, V]
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =
     RepartitionedJ.`streamPartitioner`(partitioner).withKeySerde(keySerde).withValueSerde(valueSerde)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
index e5c5823..1b20179 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
@@ -40,8 +40,8 @@ class SessionWindowedCogroupedKStream[K, V](val inner: SessionWindowedCogroupedK
    *         the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream#aggregate`
    */
-  def aggregate(initializer: => V, merger: (K, V, V) => V)(
-    implicit materialized: Materialized[K, V, ByteArraySessionStore]
+  def aggregate(initializer: => V, merger: (K, V, V) => V)(implicit
+    materialized: Materialized[K, V, ByteArraySessionStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.aggregate((() => initializer).asInitializer, merger.asMerger, materialized))
 
@@ -56,8 +56,8 @@ class SessionWindowedCogroupedKStream[K, V](val inner: SessionWindowedCogroupedK
    *         the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream#aggregate`
    */
-  def aggregate(initializer: => V, merger: (K, V, V) => V, named: Named)(
-    implicit materialized: Materialized[K, V, ByteArraySessionStore]
+  def aggregate(initializer: => V, merger: (K, V, V) => V, named: Named)(implicit
+    materialized: Materialized[K, V, ByteArraySessionStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.aggregate((() => initializer).asInitializer, merger.asMerger, named, materialized))
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 0a20444..3d6e157 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -49,8 +49,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    *         the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArraySessionStore]
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArraySessionStore]
   ): KTable[Windowed[K], VR] =
     new KTable(
       inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
@@ -68,8 +68,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    *         the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArraySessionStore]
+  def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArraySessionStore]
   ): KTable[Windowed[K], VR] =
     new KTable(
       inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, named, materialized)
@@ -127,8 +127,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    *         the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V)(
-    implicit materialized: Materialized[K, V, ByteArraySessionStore]
+  def reduce(reducer: (V, V) => V)(implicit
+    materialized: Materialized[K, V, ByteArraySessionStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.reduce(reducer.asReducer, materialized))
 
@@ -141,8 +141,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
    *         the latest (rolling) aggregate for each key within a window
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V, named: Named)(
-    implicit materialized: Materialized[K, V, ByteArraySessionStore]
+  def reduce(reducer: (V, V) => V, named: Named)(implicit
+    materialized: Materialized[K, V, ByteArraySessionStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.reduce(reducer.asReducer, named, materialized))
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
index 15bda130..9caad63 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
@@ -35,9 +35,11 @@ object StreamJoined {
    * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
    * @return new [[StreamJoined]] instance with the provided serdes
    */
-  def `with`[K, V, VO](implicit keySerde: Serde[K],
-                       valueSerde: Serde[V],
-                       otherValueSerde: Serde[VO]): StreamJoinedJ[K, V, VO] =
+  def `with`[K, V, VO](implicit
+    keySerde: Serde[K],
+    valueSerde: Serde[V],
+    otherValueSerde: Serde[VO]
+  ): StreamJoinedJ[K, V, VO] =
     StreamJoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
index e9962a6..ad24228 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
@@ -39,8 +39,8 @@ class TimeWindowedCogroupedKStream[K, V](val inner: TimeWindowedCogroupedKStream
    *         (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream#aggregate`
    */
-  def aggregate(initializer: => V)(
-    implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+  def aggregate(initializer: => V)(implicit
+    materialized: Materialized[K, V, ByteArrayWindowStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.aggregate((() => initializer).asInitializer, materialized))
 
@@ -54,8 +54,8 @@ class TimeWindowedCogroupedKStream[K, V](val inner: TimeWindowedCogroupedKStream
    *         (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream#aggregate`
    */
-  def aggregate(initializer: => V, named: Named)(
-    implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+  def aggregate(initializer: => V, named: Named)(implicit
+    materialized: Materialized[K, V, ByteArrayWindowStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized))
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index fdb137c..4fcf227 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -47,8 +47,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArrayWindowStore]
+  def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArrayWindowStore]
   ): KTable[Windowed[K], VR] =
     new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized))
 
@@ -63,8 +63,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
    */
-  def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(
-    implicit materialized: Materialized[K, VR, ByteArrayWindowStore]
+  def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit
+    materialized: Materialized[K, VR, ByteArrayWindowStore]
   ): KTable[Windowed[K], VR] =
     new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized))
 
@@ -120,8 +120,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V)(
-    implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+  def reduce(reducer: (V, V) => V)(implicit
+    materialized: Materialized[K, V, ByteArrayWindowStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.reduce(reducer.asReducer, materialized))
 
@@ -135,8 +135,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
    *         latest (rolling) aggregate for each key
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
    */
-  def reduce(reducer: (V, V) => V, named: Named)(
-    implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+  def reduce(reducer: (V, V) => V, named: Named)(implicit
+    materialized: Materialized[K, V, ByteArrayWindowStore]
   ): KTable[Windowed[K], V] =
     new KTable(inner.reduce(reducer.asReducer, materialized))
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
index c4e7537..0c72358 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
@@ -57,8 +57,10 @@ object Serdes extends LowPrioritySerdes {
       }
     )
 
-  def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
-                        deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+  def fromFn[T >: Null](
+    serializer: (String, T) => Array[Byte],
+    deserializer: (String, Array[Byte]) => Option[T]
+  ): Serde[T] =
     JSerdes.serdeFrom(
       new Serializer[T] {
         override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
@@ -75,13 +77,13 @@ object Serdes extends LowPrioritySerdes {
 
 trait LowPrioritySerdes {
 
-  implicit val nullSerde: Serde[Null] = {
+  implicit val nullSerde: Serde[Null] =
     Serdes.fromFn[Null](
       { _: Null =>
         null
-      }, { _: Array[Byte] =>
+      },
+      { _: Array[Byte] =>
         None
       }
     )
-  }
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 56b1c29..e9577bc 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -54,7 +54,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
     val clicksPerRegion: KTable[String, Long] =
       userClicksStream
 
-      // Join the stream against the table.
+        // Join the stream against the table.
         .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
 
         // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
@@ -98,7 +98,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
     val clicksPerRegion: KTable[String, Long] =
       userClicksStream
 
-      // Join the stream against the table.
+        // Join the stream against the table.
         .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
 
         // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index 92ca5bd..926ba43 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -287,13 +287,12 @@ class TopologyTest {
       val textLines = streamBuilder.stream[String, String](inputTopic)
 
       val _: KTable[String, Long] = textLines
-        .transform(
-          () =>
-            new Transformer[String, String, KeyValue[String, String]] {
-              override def init(context: ProcessorContext): Unit = ()
-              override def transform(key: String, value: String): KeyValue[String, String] =
-                new KeyValue(key, value.toLowerCase)
-              override def close(): Unit = ()
+        .transform(() =>
+          new Transformer[String, String, KeyValue[String, String]] {
+            override def init(context: ProcessorContext): Unit = ()
+            override def transform(key: String, value: String): KeyValue[String, String] =
+              new KeyValue(key, value.toLowerCase)
+            override def close(): Unit = ()
           }
         )
         .groupBy((_, v) => v)
@@ -308,13 +307,12 @@ class TopologyTest {
       val streamBuilder = new StreamsBuilderJ
       val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
 
-      val lowered: KStreamJ[String, String] = textLines.transform(
-        () =>
-          new Transformer[String, String, KeyValue[String, String]] {
-            override def init(context: ProcessorContext): Unit = ()
-            override def transform(key: String, value: String): KeyValue[String, String] =
-              new KeyValue(key, value.toLowerCase)
-            override def close(): Unit = ()
+      val lowered: KStreamJ[String, String] = textLines.transform(() =>
+        new Transformer[String, String, KeyValue[String, String]] {
+          override def init(context: ProcessorContext): Unit = ()
+          override def transform(key: String, value: String): KeyValue[String, String] =
+            new KeyValue(key, value.toLowerCase)
+          override def close(): Unit = ()
         }
       )
 
@@ -378,16 +376,20 @@ class TopologyTest {
 
       mappedStream
         .filter((k: String, _: String) => k == "A")
-        .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
-                       JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))(
+        .join(stream2)(
+          (v1: String, v2: Int) => v1 + ":" + v2.toString,
+          JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24))
+        )(
           StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde)
         )
         .to(JOINED_TOPIC)
 
       mappedStream
         .filter((k: String, _: String) => k == "A")
-        .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
-                       JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24)))(
+        .join(stream3)(
+          (v1: String, v2: String) => v1 + ":" + v2.toString,
+          JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), Duration.ofHours(24))
+        )(
           StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde)
         )
         .to(JOINED_TOPIC)
@@ -457,10 +459,14 @@ class TopologyTest {
       builder
     }
 
-    assertNotEquals(getTopologyScala.build(props).describe.toString,
-                    getTopologyScala.build(propsNoOptimization).describe.toString)
-    assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString,
-                 getTopologyJava.build(propsNoOptimization).describe.toString)
+    assertNotEquals(
+      getTopologyScala.build(props).describe.toString,
+      getTopologyScala.build(propsNoOptimization).describe.toString
+    )
+    assertEquals(
+      getTopologyScala.build(propsNoOptimization).describe.toString,
+      getTopologyJava.build(propsNoOptimization).describe.toString
+    )
     assertEquals(getTopologyScala.build(props).describe.toString, getTopologyJava.build(props).describe.toString)
   }
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
index 5b2aa76..984cb74 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -100,36 +100,44 @@ class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestDat
     p
   }
 
-  def produceNConsume(userClicksTopic: String,
-                      userRegionsTopic: String,
-                      outputTopic: String,
-                      waitTillRecordsReceived: Boolean = true): java.util.List[KeyValue[String, Long]] = {
+  def produceNConsume(
+    userClicksTopic: String,
+    userRegionsTopic: String,
+    outputTopic: String,
+    waitTillRecordsReceived: Boolean = true
+  ): java.util.List[KeyValue[String, Long]] = {
 
     import _root_.scala.jdk.CollectionConverters._
 
     // Publish user-region information.
     val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
-                                                       userRegions.asJava,
-                                                       userRegionsProducerConfig,
-                                                       mockTime,
-                                                       false)
+    IntegrationTestUtils.produceKeyValuesSynchronously(
+      userRegionsTopic,
+      userRegions.asJava,
+      userRegionsProducerConfig,
+      mockTime,
+      false
+    )
 
     // Publish user-click information.
     val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
-                                                       userClicks.asJava,
-                                                       userClicksProducerConfig,
-                                                       mockTime,
-                                                       false)
+    IntegrationTestUtils.produceKeyValuesSynchronously(
+      userClicksTopic,
+      userClicks.asJava,
+      userClicksProducerConfig,
+      mockTime,
+      false
+    )
 
     if (waitTillRecordsReceived) {
       // consume and verify result
       val consumerConfig = getConsumerConfig()
 
-      IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig,
-                                                                 outputTopic,
-                                                                 expectedClicksPerRegion.asJava)
+      IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+        consumerConfig,
+        outputTopic,
+        expectedClicksPerRegion.asJava
+      )
     } else {
       java.util.Collections.emptyList()
     }

Mime
View raw message