kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7396: Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes (#5551)
Date Tue, 11 Sep 2018 21:08:52 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new acd3858  KAFKA-7396: Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes (#5551)
acd3858 is described below

commit acd3858ea69e676a7840d998240deb32aee62dc0
Author: Joan Goyeau <joan@goyeau.com>
AuthorDate: Tue Sep 11 22:08:42 2018 +0100

    KAFKA-7396: Materialized, Serialized, Joined, Consumed and Produced with implicit Serdes (#5551)
    
    We want to make sure that we always have a serde for all Materialized, Serialized, Joined, Consumed and Produced.
    For that we can make use of the implicit parameters in Scala.
    
    KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-365%3A+Materialized%2C+Serialized%2C+Joined%2C+Consumed+and+Produced+with+implicit+Serde
    
    Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <guozhang@confluent.io>, Ted Yu <yuzhihong@gmail.com>
---
 .../org/apache/kafka/streams/kstream/Produced.java |   3 +-
 .../kstream/internals/ProducedInternal.java        |   2 +-
 .../kstream/internals/SerializedInternal.java      |   9 +-
 .../kafka/streams/scala/ImplicitConversions.scala  |  15 ++-
 .../kafka/streams/scala/StreamsBuilder.scala       |   2 +-
 .../kafka/streams/scala/kstream/Consumed.scala     |  79 +++++++++++++++
 .../kafka/streams/scala/kstream/Joined.scala       |  42 ++++++++
 .../streams/scala/kstream/KGroupedStream.scala     |   4 +-
 .../streams/scala/kstream/KGroupedTable.scala      |   4 +-
 .../kafka/streams/scala/kstream/Materialized.scala | 107 +++++++++++++++++++++
 .../kafka/streams/scala/kstream/Produced.scala     |  59 ++++++++++++
 .../kafka/streams/scala/kstream/Serialized.scala   |  36 +++++++
 .../scala/kstream/TimeWindowedKStream.scala        |   4 +-
 .../kafka/streams/scala/kstream/package.scala}     |  24 ++---
 .../apache/kafka/streams/scala/TopologyTest.scala  |  21 +++-
 .../apache/kafka/streams/scala/WordCountTest.scala |   1 -
 .../kafka/streams/scala/kstream/ConsumedTest.scala |  73 ++++++++++++++
 .../kafka/streams/scala/kstream/JoinedTest.scala}  |  34 +++----
 .../streams/scala/{ => kstream}/KStreamTest.scala  |   5 +-
 .../streams/scala/{ => kstream}/KTableTest.scala   |   9 +-
 .../streams/scala/kstream/MaterializedTest.scala   |  84 ++++++++++++++++
 .../kafka/streams/scala/kstream/ProducedTest.scala |  51 ++++++++++
 .../streams/scala/kstream/SerializedTest.scala}    |  35 ++++---
 23 files changed, 615 insertions(+), 88 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index 8d2742a..a3d96bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -71,7 +71,8 @@ public class Produced<K, V> {
      * @param valueSerde    Serde to use for serializing the value
      * @param partitioner   the function used to determine how records are distributed among partitions of the topic,
      *                      if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
-     *                      {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} wil be used
+     *                      {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner}
+     *                      will be used
      * @param <K>           key type
      * @param <V>           value type
      * @return  A new {@link Produced} instance configured with keySerde, valueSerde, and partitioner
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
index 3197244..358982b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 public class ProducedInternal<K, V> extends Produced<K, V> {
-    ProducedInternal(final Produced<K, V> produced) {
+    public ProducedInternal(final Produced<K, V> produced) {
         super(produced);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
index fb802ea..c6df11f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
@@ -19,17 +19,16 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.Serialized;
 
-class SerializedInternal<K, V> extends Serialized<K, V> {
-    SerializedInternal(final Serialized<K, V> serialized) {
+public class SerializedInternal<K, V> extends Serialized<K, V> {
+    public SerializedInternal(final Serialized<K, V> serialized) {
         super(serialized);
     }
 
-    Serde<K> keySerde() {
+    public Serde<K> keySerde() {
         return keySerde;
     }
 
-    Serde<V> valueSerde() {
+    public Serde<V> valueSerde() {
         return valueSerde;
     }
-
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index d1ff674..c2ac1ff 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -25,14 +25,13 @@ import org.apache.kafka.streams.kstream.{
   KStream => KStreamJ,
   KTable => KTableJ,
   SessionWindowedKStream => SessionWindowedKStreamJ,
-  TimeWindowedKStream => TimeWindowedKStreamJ,
-  _
+  TimeWindowedKStream => TimeWindowedKStreamJ
 }
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.common.serialization.Serde
-import scala.language.implicitConversions
 
+import scala.language.implicitConversions
 import org.apache.kafka.streams.processor.StateStore
 
 /**
@@ -65,20 +64,20 @@ object ImplicitConversions {
   // and these implicits will convert them to `Serialized`, `Produced` or `Consumed`
 
   implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] =
-    Serialized.`with`(keySerde, valueSerde)
+    Serialized.`with`[K, V]
 
   implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
-    Consumed.`with`(keySerde, valueSerde)
+    Consumed.`with`[K, V]
 
   implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
-    Produced.`with`(keySerde, valueSerde)
+    Produced.`with`[K, V]
 
   implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],
                                                             valueSerde: Serde[V]): Materialized[K, V, S] =
-    Materialized.`with`[K, V, S](keySerde, valueSerde)
+    Materialized.`with`[K, V, S]
 
   implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
                                                       valueSerde: Serde[V],
                                                       otherValueSerde: Serde[VO]): Joined[K, V, VO] =
-    Joined.`with`(keySerde, valueSerde, otherValueSerde)
+    Joined.`with`[K, V, VO]
 }
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index fcec778..8c5a9b3 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -21,7 +21,7 @@ package org.apache.kafka.streams.scala
 
 import java.util.regex.Pattern
 
-import org.apache.kafka.streams.kstream.{Consumed, GlobalKTable, Materialized}
+import org.apache.kafka.streams.kstream.GlobalKTable
 import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
 import org.apache.kafka.streams.state.StoreBuilder
 import org.apache.kafka.streams.{Topology, StreamsBuilder => StreamsBuilderJ}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
new file mode 100644
index 0000000..a105ed6
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Consumed => ConsumedJ}
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.processor.TimestampExtractor
+
+object Consumed {
+
+  /**
+   * Create an instance of [[Consumed]] with the supplied arguments. `null` values are acceptable.
+   *
+   * @tparam K                 key type
+   * @tparam V                 value type
+   * @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from
+   *                           config will be used
+   * @param resetPolicy        the offset reset policy to be used. If `null` the default reset policy from config
+   *                           will be used
+   * @param keySerde           the key serde to use.
+   * @param valueSerde         the value serde to use.
+   * @return a new instance of [[Consumed]]
+   */
+  def `with`[K, V](
+    timestampExtractor: TimestampExtractor,
+    resetPolicy: Topology.AutoOffsetReset
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
+    ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
+
+  /**
+   * Create an instance of [[Consumed]] with key and value [[Serde]]s.
+   *
+   * @tparam K         key type
+   * @tparam V         value type
+   * @return a new instance of [[Consumed]]
+   */
+  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
+    ConsumedJ.`with`(keySerde, valueSerde)
+
+  /**
+   * Create an instance of [[Consumed]] with a [[TimestampExtractor]].
+   *
+   * @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from
+   *                           config will be used
+   * @tparam K                 key type
+   * @tparam V                 value type
+   * @return a new instance of [[Consumed]]
+   */
+  def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde: Serde[K],
+                                                           valueSerde: Serde[V]): ConsumedJ[K, V] =
+    ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+  /**
+   * Create an instance of [[Consumed]] with a [[Topology.AutoOffsetReset]].
+   *
+   * @tparam K          key type
+   * @tparam V          value type
+   * @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
+   * @return a new instance of [[Consumed]]
+   */
+  def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde: Serde[K],
+                                                          valueSerde: Serde[V]): ConsumedJ[K, V] =
+    ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
new file mode 100644
index 0000000..ffd3e61
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Joined => JoinedJ}
+
+object Joined {
+
+  /**
+   * Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with key, value, and otherValue [[Serde]]
+   * instances.
+   * `null` values are accepted and will be replaced by the default serdes as defined in config.
+   *
+   * @tparam K              key type
+   * @tparam V              value type
+   * @tparam VO             other value type
+   * @param keySerde        the key serde to use.
+   * @param valueSerde      the value serde to use.
+   * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
+   * @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
+   */
+  def `with`[K, V, VO](implicit keySerde: Serde[K],
+                       valueSerde: Serde[V],
+                       otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
+    JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
+
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index f6a22d9..5d0f05e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
 /**
  * Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
  *
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
  * @param inner The underlying Java abstraction for KGroupedStream
  *
  * @see `org.apache.kafka.streams.kstream.KGroupedStream`
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 76ea9ed..56f84e3 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
 /**
  * Wraps the Java class KGroupedTable and delegates method calls to the underlying Java object.
  *
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K Type of keys
+ * @tparam V Type of values
  * @param inner The underlying Java abstraction for KGroupedTable
  *
  * @see `org.apache.kafka.streams.kstream.KGroupedTable`
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
new file mode 100644
index 0000000..eb126f0
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Materialized => MaterializedJ}
+import org.apache.kafka.streams.processor.StateStore
+import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, ByteArraySessionStore, ByteArrayWindowStore}
+import org.apache.kafka.streams.state.{KeyValueBytesStoreSupplier, SessionBytesStoreSupplier, WindowBytesStoreSupplier}
+
+object Materialized {
+
+  /**
+   * Materialize a [[StateStore]] with the provided key and value [[Serde]]s.
+   * An internal name will be used for the store.
+   *
+   * @tparam K         key type
+   * @tparam V         value type
+   * @tparam S         store type
+   * @param keySerde   the key [[Serde]] to use.
+   * @param valueSerde the value [[Serde]] to use.
+   * @return a new [[Materialized]] instance with the given key and value serdes
+   */
+  def `with`[K, V, S <: StateStore](implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, S] =
+    MaterializedJ.`with`(keySerde, valueSerde)
+
+  /**
+   * Materialize a [[StateStore]] with the given name.
+   *
+   * @tparam K         key type of the store
+   * @tparam V         value type of the store
+   * @tparam S         type of the [[StateStore]]
+   * @param storeName  the name of the underlying [[org.apache.kafka.streams.scala.kstream.KTable]] state store;
+   *                   valid characters are ASCII alphanumerics, '.', '_' and '-'.
+   * @param keySerde   the key serde to use.
+   * @param valueSerde the value serde to use.
+   * @return a new [[Materialized]] instance with the given storeName
+   */
+  def as[K, V, S <: StateStore](storeName: String)(implicit keySerde: Serde[K],
+                                                   valueSerde: Serde[V]): MaterializedJ[K, V, S] =
+    MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+  /**
+   * Materialize a [[org.apache.kafka.streams.state.WindowStore]] using the provided [[WindowBytesStoreSupplier]].
+   *
+   * Important: Custom subclasses are allowed here, but they should respect the retention contract:
+   * Window stores are required to retain windows at least as long as (window size + window grace period).
+   * Stores constructed via [[org.apache.kafka.streams.state.Stores]] already satisfy this contract.
+   *
+   * @tparam K         key type of the store
+   * @tparam V         value type of the store
+   * @param supplier   the [[WindowBytesStoreSupplier]] used to materialize the store
+   * @param keySerde   the key serde to use.
+   * @param valueSerde the value serde to use.
+   * @return a new [[Materialized]] instance with the given supplier
+   */
+  def as[K, V](supplier: WindowBytesStoreSupplier)(implicit keySerde: Serde[K],
+                                                   valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] =
+    MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+  /**
+   * Materialize a [[org.apache.kafka.streams.state.SessionStore]] using the provided [[SessionBytesStoreSupplier]].
+   *
+   * Important: Custom subclasses are allowed here, but they should respect the retention contract:
+   * Session stores are required to retain windows at least as long as (session inactivity gap + session grace period).
+   * Stores constructed via [[org.apache.kafka.streams.state.Stores]] already satisfy this contract.
+   *
+   * @tparam K         key type of the store
+   * @tparam V         value type of the store
+   * @param supplier   the [[SessionBytesStoreSupplier]] used to materialize the store
+   * @param keySerde   the key serde to use.
+   * @param valueSerde the value serde to use.
+   * @return a new [[Materialized]] instance with the given supplier
+   */
+  def as[K, V](supplier: SessionBytesStoreSupplier)(implicit keySerde: Serde[K],
+                                                    valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] =
+    MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
+
+  /**
+   * Materialize a [[org.apache.kafka.streams.state.KeyValueStore]] using the provided [[KeyValueBytesStoreSupplier]].
+   *
+   * @tparam K         key type of the store
+   * @tparam V         value type of the store
+   * @param supplier   the [[KeyValueBytesStoreSupplier]] used to materialize the store
+   * @param keySerde   the key serde to use.
+   * @param valueSerde the value serde to use.
+   * @return a new [[Materialized]] instance with the given supplier
+   */
+  def as[K, V](
+    supplier: KeyValueBytesStoreSupplier
+  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayKeyValueStore] =
+    MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
new file mode 100644
index 0000000..351e0a5
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Produced => ProducedJ}
+import org.apache.kafka.streams.processor.StreamPartitioner
+
+object Produced {
+
+  /**
+   * Create a Produced instance with provided keySerde and valueSerde.
+   *
+   * @tparam K         key type
+   * @tparam V         value type
+   * @param keySerde   Serde to use for serializing the key
+   * @param valueSerde Serde to use for serializing the value
+   * @return A new [[Produced]] instance configured with keySerde and valueSerde
+   * @see KStream#through(String, Produced)
+   * @see KStream#to(String, Produced)
+   */
+  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
+    ProducedJ.`with`(keySerde, valueSerde)
+
+  /**
+   * Create a Produced instance with provided keySerde, valueSerde, and partitioner.
+   *
+   * @tparam K          key type
+   * @tparam V          value type
+   * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+   *                    if not specified and `keySerde` provides a
+   *                    [[org.apache.kafka.streams.kstream.internals.WindowedSerializer]] for the key
+   *                    [[org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner]] will be
+   *                    used&mdash;otherwise [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
+   *                    will be used
+   * @param keySerde    Serde to use for serializing the key
+   * @param valueSerde  Serde to use for serializing the value
+   * @return A new [[Produced]] instance configured with keySerde, valueSerde, and partitioner
+   * @see KStream#through(String, Produced)
+   * @see KStream#to(String, Produced)
+   */
+  def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K],
+                                                         valueSerde: Serde[V]): ProducedJ[K, V] =
+    ProducedJ.`with`(keySerde, valueSerde, partitioner)
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
new file mode 100644
index 0000000..f48d9bf
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Serialized.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Serialized => SerializedJ}
+
+object Serialized {
+
+  /**
+   * Construct a `Serialized` instance with the provided key and value [[Serde]]s.
+   * If the [[Serde]] params are `null` the default serdes defined in the configs will be used.
+   *
+   * @tparam K         the key type
+   * @tparam V         the value type
+   * @param keySerde   keySerde that will be used to materialize a stream
+   * @param valueSerde valueSerde that will be used to materialize a stream
+   * @return a new instance of [[Serialized]] configured with the provided serdes
+   */
+  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): SerializedJ[K, V] =
+    SerializedJ.`with`(keySerde, valueSerde)
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 9be5794..c54ba4f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.scala.FunctionConversions._
 /**
  * Wraps the Java class TimeWindowedKStream and delegates method calls to the underlying Java object.
  *
- * @param [K] Type of keys
- * @param [V] Type of values
+ * @tparam K    Type of keys
+ * @tparam V    Type of values
  * @param inner The underlying Java abstraction for TimeWindowedKStream
  *
  * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream`
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
similarity index 61%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
copy to streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
index fb802ea..842dd79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SerializedInternal.java
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/package.scala
@@ -14,22 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals;
+package org.apache.kafka.streams.scala
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Serialized;
-
-class SerializedInternal<K, V> extends Serialized<K, V> {
-    SerializedInternal(final Serialized<K, V> serialized) {
-        super(serialized);
-    }
-
-    Serde<K> keySerde() {
-        return keySerde;
-    }
-
-    Serde<V> valueSerde() {
-        return valueSerde;
-    }
+import org.apache.kafka.streams.processor.StateStore
 
+package object kstream {
+  type Materialized[K, V, S <: StateStore] = org.apache.kafka.streams.kstream.Materialized[K, V, S]
+  type Serialized[K, V] = org.apache.kafka.streams.kstream.Serialized[K, V]
+  type Consumed[K, V] = org.apache.kafka.streams.kstream.Consumed[K, V]
+  type Produced[K, V] = org.apache.kafka.streams.kstream.Produced[K, V]
+  type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO]
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index b596dd3..889e67c 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -21,9 +21,20 @@ package org.apache.kafka.streams.scala
 
 import java.util.regex.Pattern
 
-import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KStream => KStreamJ, KTable => KTableJ, _}
+import org.apache.kafka.streams.kstream.{
+  KeyValueMapper,
+  Reducer,
+  Transformer,
+  TransformerSupplier,
+  ValueJoiner,
+  ValueMapper,
+  KGroupedStream => KGroupedStreamJ,
+  KStream => KStreamJ,
+  KTable => KTableJ
+}
 import org.apache.kafka.streams.processor.ProcessorContext
 import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.Serdes._
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
 import org.junit.Assert._
@@ -153,10 +164,10 @@ class TopologyTest extends JUnitSuite {
       val builder: StreamsBuilderJ = new StreamsBuilderJ()
 
       val userClicksStream: KStreamJ[String, JLong] =
-        builder.stream[String, JLong](userClicksTopic, Consumed.`with`(Serdes.String, Serdes.JavaLong))
+        builder.stream[String, JLong](userClicksTopic, Consumed.`with`[String, JLong])
 
       val userRegionsTable: KTableJ[String, String] =
-        builder.table[String, String](userRegionsTopic, Consumed.`with`(Serdes.String, Serdes.String))
+        builder.table[String, String](userRegionsTopic, Consumed.`with`[String, String])
 
       // Join the stream against the table.
       val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream
@@ -166,7 +177,7 @@ class TopologyTest extends JUnitSuite {
             def apply(clicks: JLong, region: String): (String, JLong) =
               (if (region == null) "UNKNOWN" else region, clicks)
           },
-          Joined.`with`[String, JLong, String](Serdes.String, Serdes.JavaLong, Serdes.String)
+          Joined.`with`[String, JLong, String]
         )
 
       // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
@@ -180,7 +191,7 @@ class TopologyTest extends JUnitSuite {
 
       // Compute the total per region by summing the individual click counts per region.
       val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
-        .groupByKey(Serialized.`with`(Serdes.String, Serdes.JavaLong))
+        .groupByKey(Serialized.`with`[String, JLong])
         .reduce {
           new Reducer[JLong] {
             def apply(v1: JLong, v2: JLong) = v1 + v2
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 9d821be..bbc84f6 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -29,7 +29,6 @@ import org.junit.rules.TemporaryFolder
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.streams.kstream.Materialized
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
new file mode 100644
index 0000000..87b478c
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ConsumedTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.Serdes
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class ConsumedTest extends FlatSpec with Matchers {
+
+  "Create a Consumed" should "create a Consumed with Serdes" in {
+    val consumed: Consumed[String, Long] = Consumed.`with`[String, Long]
+
+    val internalConsumed = new ConsumedInternal(consumed)
+    internalConsumed.keySerde shouldBe Serdes.String
+    internalConsumed.valueSerde shouldBe Serdes.Long
+  }
+
+  "Create a Consumed with timestampExtractor and resetPolicy" should "create a Consumed with Serdes, timestampExtractor and resetPolicy" in {
+    val timestampExtractor = new FailOnInvalidTimestamp()
+    val resetPolicy = Topology.AutoOffsetReset.LATEST
+    val consumed: Consumed[String, Long] =
+      Consumed.`with`[String, Long](timestampExtractor, resetPolicy)
+
+    val internalConsumed = new ConsumedInternal(consumed)
+    internalConsumed.keySerde shouldBe Serdes.String
+    internalConsumed.valueSerde shouldBe Serdes.Long
+    internalConsumed.timestampExtractor shouldBe timestampExtractor
+    internalConsumed.offsetResetPolicy shouldBe resetPolicy
+  }
+
+  "Create a Consumed with timestampExtractor" should "create a Consumed with Serdes and timestampExtractor" in {
+    val timestampExtractor = new FailOnInvalidTimestamp()
+    val consumed: Consumed[String, Long] = Consumed.`with`[String, Long](timestampExtractor)
+
+    val internalConsumed = new ConsumedInternal(consumed)
+    internalConsumed.keySerde shouldBe Serdes.String
+    internalConsumed.valueSerde shouldBe Serdes.Long
+    internalConsumed.timestampExtractor shouldBe timestampExtractor
+  }
+
+  "Create a Consumed with resetPolicy" should "create a Consumed with Serdes and resetPolicy" in {
+    val resetPolicy = Topology.AutoOffsetReset.LATEST
+    val consumed: Consumed[String, Long] = Consumed.`with`[String, Long](resetPolicy)
+
+    val internalConsumed = new ConsumedInternal(consumed)
+    internalConsumed.keySerde shouldBe Serdes.String
+    internalConsumed.valueSerde shouldBe Serdes.Long
+    internalConsumed.offsetResetPolicy shouldBe resetPolicy
+  }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
similarity index 55%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
copy to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
index 3197244..f9fcbb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/JoinedTest.scala
@@ -1,4 +1,6 @@
 /*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,26 +16,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+package org.apache.kafka.streams.scala.kstream
 
-public class ProducedInternal<K, V> extends Produced<K, V> {
-    ProducedInternal(final Produced<K, V> produced) {
-        super(produced);
-    }
+import org.apache.kafka.streams.scala.Serdes
+import org.apache.kafka.streams.scala.Serdes._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
 
-    public Serde<K> keySerde() {
-        return keySerde;
-    }
+@RunWith(classOf[JUnitRunner])
+class JoinedTest extends FlatSpec with Matchers {
 
-    public Serde<V> valueSerde() {
-        return valueSerde;
-    }
+  "Create a Joined" should "create a Joined with Serdes" in {
+    val joined: Joined[String, Long, Int] = Joined.`with`[String, Long, Int]
 
-    public StreamPartitioner<? super K, ? super V> streamPartitioner() {
-        return partitioner;
-    }
+    joined.keySerde shouldBe Serdes.String
+    joined.valueSerde shouldBe Serdes.Long
+    joined.otherValueSerde shouldBe Serdes.Integer
+  }
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
similarity index 98%
rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 3fdfee6..f339756 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -16,11 +16,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.scala
+package org.apache.kafka.streams.scala.kstream
 
 import org.apache.kafka.streams.kstream.JoinWindows
-import org.apache.kafka.streams.scala.Serdes._
 import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.StreamsBuilder
 import org.apache.kafka.streams.scala.utils.TestDriver
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
similarity index 95%
rename from streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
rename to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index 2e9c821..dc080f1 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -16,12 +16,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.scala
+package org.apache.kafka.streams.scala.kstream
 
-import org.apache.kafka.streams.kstream.Materialized
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.Serdes._
 import org.apache.kafka.streams.scala.utils.TestDriver
+import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
@@ -122,10 +122,7 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
     val sourceTopic2 = "source2"
     val sinkTopic = "sink"
     val stateStore = "store"
-    val materialized = Materialized
-      .as[String, Long, ByteArrayKeyValueStore](stateStore)
-      .withKeySerde(Serdes.String)
-      .withValueSerde(Serdes.Long)
+    val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore](stateStore)
 
     val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count()
     val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count()
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/MaterializedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/MaterializedTest.scala
new file mode 100644
index 0000000..8d24efe
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/MaterializedTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala._
+import org.apache.kafka.streams.state.Stores
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class MaterializedTest extends FlatSpec with Matchers {
+
+  "Create a Materialized" should "create a Materialized with Serdes" in {
+    val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
+      Materialized.`with`[String, Long, ByteArrayKeyValueStore]
+
+    val internalMaterialized = new MaterializedInternal(materialized)
+    internalMaterialized.keySerde shouldBe Serdes.String
+    internalMaterialized.valueSerde shouldBe Serdes.Long
+  }
+
+  "Create a Materialize with a store name" should "create a Materialized with Serdes and a store name" in {
+    val storeName = "store"
+    val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
+      Materialized.as[String, Long, ByteArrayKeyValueStore](storeName)
+
+    val internalMaterialized = new MaterializedInternal(materialized)
+    internalMaterialized.keySerde shouldBe Serdes.String
+    internalMaterialized.valueSerde shouldBe Serdes.Long
+    internalMaterialized.storeName shouldBe storeName
+  }
+
+  "Create a Materialize with a window store supplier" should "create a Materialized with Serdes and a store supplier" in {
+    val storeSupplier = Stores.persistentWindowStore("store", 1, 1, true)
+    val materialized: Materialized[String, Long, ByteArrayWindowStore] =
+      Materialized.as[String, Long](storeSupplier)
+
+    val internalMaterialized = new MaterializedInternal(materialized)
+    internalMaterialized.keySerde shouldBe Serdes.String
+    internalMaterialized.valueSerde shouldBe Serdes.Long
+    internalMaterialized.storeSupplier shouldBe storeSupplier
+  }
+
+  "Create a Materialize with a key value store supplier" should "create a Materialized with Serdes and a store supplier" in {
+    val storeSupplier = Stores.persistentKeyValueStore("store")
+    val materialized: Materialized[String, Long, ByteArrayKeyValueStore] =
+      Materialized.as[String, Long](storeSupplier)
+
+    val internalMaterialized = new MaterializedInternal(materialized)
+    internalMaterialized.keySerde shouldBe Serdes.String
+    internalMaterialized.valueSerde shouldBe Serdes.Long
+    internalMaterialized.storeSupplier shouldBe storeSupplier
+  }
+
+  "Create a Materialize with a session store supplier" should "create a Materialized with Serdes and a store supplier" in {
+    val storeSupplier = Stores.persistentSessionStore("store", 1)
+    val materialized: Materialized[String, Long, ByteArraySessionStore] =
+      Materialized.as[String, Long](storeSupplier)
+
+    val internalMaterialized = new MaterializedInternal(materialized)
+    internalMaterialized.keySerde shouldBe Serdes.String
+    internalMaterialized.valueSerde shouldBe Serdes.Long
+    internalMaterialized.storeSupplier shouldBe storeSupplier
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala
new file mode 100644
index 0000000..7a248ab
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.streams.kstream.internals.ProducedInternal
+import org.apache.kafka.streams.processor.StreamPartitioner
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.Serdes
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class ProducedTest extends FlatSpec with Matchers {
+
+  "Create a Produced" should "create a Produced with Serdes" in {
+    val produced: Produced[String, Long] = Produced.`with`[String, Long]
+
+    val internalProduced = new ProducedInternal(produced)
+    internalProduced.keySerde shouldBe Serdes.String
+    internalProduced.valueSerde shouldBe Serdes.Long
+  }
+
+  "Create a Produced with timestampExtractor and resetPolicy" should "create a Consumed with Serdes, timestampExtractor and resetPolicy" in {
+    val partitioner = new StreamPartitioner[String, Long] {
+      override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0
+    }
+    val produced: Produced[String, Long] = Produced.`with`(partitioner)
+
+    val internalConsumed = new ProducedInternal(produced)
+    internalConsumed.keySerde shouldBe Serdes.String
+    internalConsumed.valueSerde shouldBe Serdes.Long
+    internalConsumed.streamPartitioner shouldBe partitioner
+  }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
similarity index 51%
copy from streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
copy to streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
index 3197244..8c072b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SerializedTest.scala
@@ -1,4 +1,6 @@
 /*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,26 +16,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+package org.apache.kafka.streams.scala.kstream
 
-public class ProducedInternal<K, V> extends Produced<K, V> {
-    ProducedInternal(final Produced<K, V> produced) {
-        super(produced);
-    }
+import org.apache.kafka.streams.kstream.internals.SerializedInternal
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.Serdes
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
 
-    public Serde<K> keySerde() {
-        return keySerde;
-    }
+@RunWith(classOf[JUnitRunner])
+class SerializedTest extends FlatSpec with Matchers {
 
-    public Serde<V> valueSerde() {
-        return valueSerde;
-    }
+  "Create a Serialized" should "create a Serialized with Serdes" in {
+    val serialized: Serialized[String, Long] = Serialized.`with`[String, Long]
 
-    public StreamPartitioner<? super K, ? super V> streamPartitioner() {
-        return partitioner;
-    }
+    val internalSerialized = new SerializedInternal(serialized)
+    internalSerialized.keySerde shouldBe Serdes.String
+    internalSerialized.valueSerde shouldBe Serdes.Long
+  }
 }


Mime
View raw message