kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: Return correct instance of SessionWindowSerde (#5546)
Date Thu, 23 Aug 2018 20:42:05 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new dd00295  MINOR: Return correct instance of SessionWindowSerde (#5546)
dd00295 is described below

commit dd00295f7b9ae891fdaaadafa41f9a3cf96044a5
Author: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
AuthorDate: Fri Aug 24 02:09:35 2018 +0530

    MINOR: Return correct instance of SessionWindowSerde (#5546)
    
    Plus minor javadoc cleanups.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>,Guozhang Wang <guozhang@confluent.io>,
John Roesler <john@confluent.io>
---
 .../java/org/apache/kafka/streams/Topology.java    | 29 +++++--------
 .../org/apache/kafka/streams/kstream/Consumed.java |  2 +-
 .../kafka/streams/kstream/KGroupedStream.java      |  8 ++--
 .../kafka/streams/kstream/KGroupedTable.java       | 23 +++++------
 .../org/apache/kafka/streams/kstream/KStream.java  |  8 ++--
 .../apache/kafka/streams/kstream/Transformer.java  |  6 +--
 .../kafka/streams/kstream/ValueTransformer.java    |  4 +-
 .../org/apache/kafka/streams/kstream/Window.java   |  4 +-
 .../kafka/streams/kstream/WindowedSerdes.java      |  2 +-
 .../kafka/streams/processor/internals/Task.java    |  2 +-
 .../kafka/streams/kstream/WindowedSerdesTest.java  | 47 ++++++++++++++++++++++
 11 files changed, 85 insertions(+), 50 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 753185c..8b2a46b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -289,7 +289,7 @@ public class Topology {
      * Add a new source that consumes from topics matching the given pattern and forwards
the records to child processor
      * and/or sink nodes.
      * The source will use the specified key and value deserializers.
-     * The provided de-/serializers will be used for all matched topics, so care should be
taken to specify patterns for
+     * The provided de-/serializers will be used for all the specified topics, so care should
be taken when specifying
      * topics that share the same key-value data format.
      *
      * @param offsetReset        the auto offset reset policy to use for this stream if no
committed offsets found;
@@ -412,8 +412,7 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output
records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -445,8 +444,7 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output
records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -474,8 +472,7 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output
records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -505,8 +502,7 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output
records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
@@ -533,8 +529,7 @@ public class Topology {
      * @param parentNames       the name of one or more source or processor nodes whose output
records this sink should consume
      *                          and dynamically write to topics
      * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -567,8 +562,7 @@ public class Topology {
      * @param parentNames       the name of one or more source or processor nodes whose output
records this sink should consume
      *                          and dynamically write to topics
      * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -597,8 +591,7 @@ public class Topology {
      * @param parentNames       the name of one or more source or processor nodes whose output
records this sink should consume
      *                          and dynamically write to topics
      * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
@@ -629,8 +622,7 @@ public class Topology {
      * @param parentNames       the name of one or more source or processor nodes whose output
records this sink should consume
      *                          and dynamically write to topics
      * @return                  itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
@@ -655,8 +647,7 @@ public class Topology {
      * @param parentNames the name of one or more source or processor nodes whose output
records this processor should receive
      * and process
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name,
-     *                           or if this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if this processor's
name is equal to the parent's name
      */
     public synchronized Topology addProcessor(final String name,
                                               final ProcessorSupplier supplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index e2132ec..0af7dbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -97,7 +97,7 @@ public class Consumed<K, V> {
     /**
      * Create an instance of {@link Consumed} with key and value {@link Serde}s.
      *
-     * @param keySerde   the key serde. If {@code null}the default key serde from config
will be used
+     * @param keySerde   the key serde. If {@code null} the default key serde from config
will be used
      * @param valueSerde the value serde. If {@code null} the default value serde from config
will be used
      * @param <K>        key type
      * @param <V>        value type
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 53a2be7..7b69e03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -126,7 +126,7 @@ public interface KGroupedStream<K, V> {
      * aggregate and the record's value.
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like
sum, min, or max.
+     * Thus, {@code reduce(Reducer)} can be used to compute aggregate functions like sum,
min, or max.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
      * the same key.
@@ -189,7 +189,7 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * KafkaStreams streams = ... // compute sum
      * String queryableStoreName = "storeName" // the store name should be the name of the
store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long sumForKey = localStore.get(key); // key must be local (application state is shared
over all running Kafka Streams instances)
      * }</pre>
@@ -271,7 +271,7 @@ public interface KGroupedStream<K, V> {
      * The specified {@link Aggregator} is applied for each input record and computes a new
aggregate using the current
      * aggregate (or for the very first record using the intermediate aggregation result
provided via the
      * {@link Initializer}) and the record's value.
-     * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute
aggregate functions like
+     * Thus, {@code aggregate(Initializer, Aggregator, Materialized)} can be used to compute
aggregate functions like
      * count (c.f. {@link #count()}).
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
@@ -286,7 +286,7 @@ public interface KGroupedStream<K, V> {
      * <pre>{@code
      * KafkaStreams streams = ... // some aggregation on value type double
      * String queryableStoreName = "storeName" // the store name should be the name of the
store as defined by the Materialized instance
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-key";
      * Long aggForKey = localStore.get(key); // key must be local (application state is shared
over all running Kafka Streams instances)
      * }</pre>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 0e26336..30f348c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -60,7 +60,7 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
      * }</pre>
@@ -89,7 +89,6 @@ public interface KGroupedTable<K, V> {
      * the same key into a new instance of {@link KTable}.
      * Records with {@code null} key are ignored.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
-     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
      * <p>
      * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
@@ -158,7 +157,7 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
      * }</pre>
@@ -191,7 +190,6 @@ public interface KGroupedTable<K, V> {
      * Combining implies that the type of the aggregate result is the same as the type of
the input value
      * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator)}).
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
-     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
      * <p>
      * Each update to the original {@link KTable} results in a two step update of the result
{@link KTable}.
@@ -202,7 +200,7 @@ public interface KGroupedTable<K, V> {
      * record from the aggregate.
      * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
      * value as-is.
-     * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions
like sum.
+     * Thus, {@code reduce(Reducer, Reducer)} can be used to compute aggregate functions
like sum.
      * For sum, the adder and subtractor would work as follows:
      * <pre>{@code
      * public class SumAdder implements Reducer<Integer> {
@@ -243,12 +241,12 @@ public interface KGroupedTable<K, V> {
 
     /**
      * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
-     * mapped} to the same key into a new instance of {@link KTable} using default serializers
and deserializers.
+     * mapped} to the same key into a new instance of {@link KTable}.
      * Records with {@code null} key are ignored.
      * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, Materialized)
combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
-     * provided by the given {@code storeSupplier}.
+     * that can be queried using the provided {@code queryableStoreName}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
      * <p>
      * The specified {@link Initializer} is applied once directly before the first input
record is processed to
@@ -260,11 +258,11 @@ public interface KGroupedTable<K, V> {
      * The specified {@link Aggregator subtractor} is applied for each "replaced" record
of the original {@link KTable}
      * and computes a new aggregate using the current aggregate and the record's value by
"removing" the "replaced"
      * record from the aggregate.
-     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to
compute aggregate functions
+     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, Materialized)} can be
used to compute aggregate functions
      * like sum.
      * For sum, the initializer, adder, and subtractor would work as follows:
      * <pre>{@code
-     * // in this example, LongSerde.class must be set as default value serde in StreamsConfig
+     * // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
      * public class SumInitializer implements Initializer<Long> {
      *   public Long apply() {
      *     return 0L;
@@ -277,7 +275,7 @@ public interface KGroupedTable<K, V> {
      *   }
      * }
      *
-     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     * public class SumSubtractor implements Aggregator<String, Integer, Long> {
      *   public Long apply(String key, Integer oldValue, Long aggregate) {
      *     return aggregate - oldValue;
      *   }
@@ -294,7 +292,7 @@ public interface KGroupedTable<K, V> {
      * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
      * <pre>{@code
      * KafkaStreams streams = ... // counting words
-     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
+     * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName,
QueryableStoreTypes.<String, Long>keyValueStore());
      * String key = "some-word";
      * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
      * }</pre>
@@ -333,7 +331,6 @@ public interface KGroupedTable<K, V> {
      * If the result value type does not match the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value
      * serde} you should use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized)}.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
-     * provided by the given {@code storeSupplier}.
      * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
      * <p>
      * The specified {@link Initializer} is applied once directly before the first input
record is processed to
@@ -362,7 +359,7 @@ public interface KGroupedTable<K, V> {
      *   }
      * }
      *
-     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     * public class SumSubtractor implements Aggregator<String, Integer, Long> {
      *   public Long apply(String key, Integer oldValue, Long aggregate) {
      *     return aggregate - oldValue;
      *   }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index b6cc544..ae3b28a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -190,7 +190,7 @@ public interface KStream<K, V> {
      * The provided {@link ValueMapperWithKey} is applied to each input record value and
computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output record
{@code <K:V'>}.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+     * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful
value transformation).
      * <p>
      * The example below counts the number of tokens of key and value strings.
      * <pre>{@code
@@ -317,7 +317,7 @@ public interface KStream<K, V> {
      * stream (value type can be altered arbitrarily).
      * The provided {@link ValueMapperWithKey} is applied to each input record and computes
zero or more output values.
      * Thus, an input record {@code <K,V>} can be transformed into output records {@code
<K:V'>, <K:V''>, ...}.
-     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier,
String...)}
+     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier,
String...)}
      * for stateful value transformation).
      * <p>
      * The example below splits input records {@code <Integer:String>}, with key=1,
containing sentences as values
@@ -440,8 +440,8 @@ public interface KStream<K, V> {
      * This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde,
valueSerde)}
      * and {@link StreamsBuilder#stream(String, Consumed) StreamsBuilder#stream(someTopicName,
Consumed.with(keySerde, valueSerde))}.
      *
-     * @param topic
-     * @param produced
+     * @param topic     the topic name
+     * @param produced  the options to use when producing to the topic
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned)
records as this {@code KStream}
      */
     KStream<K, V> through(final String topic,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 43b6115..0ab3469 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -52,7 +52,7 @@ public interface Transformer<K, V, R> {
      * Initialize this transformer.
      * This is called once per instance when the topology gets initialized.
      * When the framework is done with the transformer, {@link #close()} will be called on
it; the
-     * framework may later re-use the transformer by calling {@link #init()} again.
+     * framework may later re-use the transformer by calling {@link #init(ProcessorContext)}
again.
      * <p>
      * The provided {@link ProcessorContext context} can be used to access topology and record
meta data, to
      * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method
to be
@@ -73,7 +73,7 @@ public interface Transformer<K, V, R> {
      * <p>
      * If more than one output record should be forwarded downstream {@link ProcessorContext#forward(Object,
Object)}
      * and {@link ProcessorContext#forward(Object, Object, To)} can be used.
-     * If not record should be forwarded downstream, {@code transform} can return {@code
null}.
+     * If record should not be forwarded downstream, {@code transform} can return {@code
null}.
      *
      * @param key the key for the record
      * @param value the value for the record
@@ -84,7 +84,7 @@ public interface Transformer<K, V, R> {
 
     /**
      * Close this transformer and clean up any resources. The framework may
-     * later re-use this transformer by calling {@link #init()} on it again.
+     * later re-use this transformer by calling {@link #init(ProcessorContext)} on it again.
      * <p>
      * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)}
and
      * {@link ProcessorContext#forward(Object, Object, To)} can be used.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 866cce8..b02311b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -51,7 +51,7 @@ public interface ValueTransformer<V, VR> {
      * Initialize this transformer.
      * This is called once per instance when the topology gets initialized.
      * When the framework is done with the transformer, {@link #close()} will be called on
it; the
-     * framework may later re-use the transformer by calling {@link #init()} again.
+     * framework may later re-use the transformer by calling {@link #init(ProcessorContext)}
again.
      * <p>
      * The provided {@link ProcessorContext context} can be used to access topology and record
meta data, to
      * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method
to be
@@ -87,7 +87,7 @@ public interface ValueTransformer<V, VR> {
 
     /**
      * Close this transformer and clean up any resources. The framework may
-     * later re-use this transformer by calling {@link #init()} on it again.
+     * later re-use this transformer by calling {@link #init(ProcessorContext)} on it again.
      * <p>
      * It is not possible to return any new output records within {@code close()}.
      * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object,
Object, To)}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 08540a1..f625068 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -103,8 +103,8 @@ public abstract class Window {
     @Override
     public String toString() {
         return "Window{" +
-            "start=" + startMs +
-            ", end=" + endMs +
+            "startMs=" + startMs +
+            ", endMs=" + endMs +
             '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
index d0381c7..6a851a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -54,6 +54,6 @@ public class WindowedSerdes {
      * Construct a {@code SessionWindowedSerde} object for the specified inner class type.
      */
     static public <T> Serde<Windowed<T>> sessionWindowedSerdeFrom(final
Class<T> type) {
-        return new TimeWindowedSerde<>(Serdes.serdeFrom(type));
+        return new SessionWindowedSerde<>(Serdes.serdeFrom(type));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 5f221e3..2b43640 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -27,7 +27,7 @@ import java.util.Set;
 
 public interface Task {
     /**
-     * Initialize the task and return {}true if the task is ready to run, i.e, it has not
state stores
+     * Initialize the task and return {@code true} if the task is ready to run, i.e, it has
not state stores
      * @return true if this task has no state stores that may need restoring.
      * @throws IllegalStateException If store gets registered after initialized is already
finished
      * @throws StreamsException if the store's change log does not contain the partition
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
new file mode 100644
index 0000000..4360d08
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WindowedSerdesTest {
+
+    private final String topic = "sample";
+
+    @Test
+    public void testTimeWindowSerdeFrom() {
+        final Windowed<Integer> timeWindowed = new Windowed<>(10, new TimeWindow(0,
Long.MAX_VALUE));
+        final Serde<Windowed<Integer>> timeWindowedSerde = WindowedSerdes.timeWindowedSerdeFrom(Integer.class);
+        final byte[] bytes = timeWindowedSerde.serializer().serialize(topic, timeWindowed);
+        final Windowed<Integer> windowed = timeWindowedSerde.deserializer().deserialize(topic,
bytes);
+        Assert.assertEquals(timeWindowed, windowed);
+    }
+
+    @Test
+    public void testSessionWindowedSerdeFrom() {
+        final Windowed<Integer> sessionWindowed = new Windowed<>(10, new SessionWindow(0,
1));
+        final Serde<Windowed<Integer>> sessionWindowedSerde = WindowedSerdes.sessionWindowedSerdeFrom(Integer.class);
+        final byte[] bytes = sessionWindowedSerde.serializer().serialize(topic, sessionWindowed);
+        final Windowed<Integer> windowed = sessionWindowedSerde.deserializer().deserialize(topic,
bytes);
+        Assert.assertEquals(sessionWindowed, windowed);
+    }
+
+}


Mime
View raw message