kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5816; add Produced class, KStream#to(topic, Produced), and KStream#through(topic, Produced)
Date Thu, 07 Sep 2017 07:54:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b041c8d87 -> 667cd60dc


KAFKA-5816; add Produced class, KStream#to(topic, Produced), and KStream#through(topic, Produced)

Add the `Produced` class and `KStream` overloads that use it:
`KStream#to(String, Produced)`
`KStream#through(String, Produced)`
Deprecate all other to and through methods accept the single param methods that take a topic
param

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3770 from dguy/kafka-5652-produced


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/667cd60d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/667cd60d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/667cd60d

Branch: refs/heads/trunk
Commit: 667cd60dc6ba68831423a256b6e455f7d955581c
Parents: b041c8d
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Sep 7 08:54:10 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Sep 7 08:54:10 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  18 +-
 .../apache/kafka/streams/kstream/KStream.java   |  41 +++++
 .../apache/kafka/streams/kstream/Produced.java  | 163 +++++++++++++++++++
 .../streams/kstream/internals/KStreamImpl.java  |  34 ++--
 .../kstream/internals/KStreamImplTest.java      |  47 ++++++
 5 files changed, 287 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 4da50a2..10220fb 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1955,7 +1955,9 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
     </p>
 
     <pre class="brush: java;">
-    joined.to("topic4");
+        joined.to("topic4");
+        // or using custom Serdes and a StreamPartitioner
+        joined.to("topic5", Produced.with(keySerde, valueSerde, myStreamPartitioner));
     </pre>
 
     If your application needs to continue reading and processing the records after they have
been materialized
@@ -1963,11 +1965,15 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
     Kafka Streams provides a convenience method called <code>through</code>:
 
     <pre class="brush: java;">
-    // equivalent to
-    //
-    // joined.to("topic4");
-    // materialized = builder.stream("topic4");
-    KStream&lt;String, String&gt; materialized = joined.through("topic4");
+        // equivalent to
+        //
+        // joined.to("topic4");
+        // materialized = builder.stream("topic4");
+        KStream&lt;String, String&gt; materialized = joined.through("topic4");
+        // if you need to provide serdes or a custom StreamPartitioner you can use
+        // the overloaded version
+        KStream&lt;String, String&gt; materialized = joined.through("topic5",
+                Produced.with(keySerde, valueSerde, myStreamPartitioner));
     </pre>
     <br>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index b8b5b8d..5a36cde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -675,7 +675,9 @@ public interface KStream<K, V> {
      *                    if not specified producer's {@link DefaultPartitioner} will be
used
      * @param topic       the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned)
records as this {@code KStream}
+     * @deprecated use {@code through(String, Produced)}
      */
+    @Deprecated
     KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                           final String topic);
 
@@ -696,7 +698,9 @@ public interface KStream<K, V> {
      *                 if not specified the default value serde defined in the configuration
will be used
      * @param topic    the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned)
records as this {@code KStream}
+     * @deprecated use {@code through(String, Produced)}
      */
+    @Deprecated
     KStream<K, V> through(final Serde<K> keySerde,
                           final Serde<V> valSerde,
                           final String topic);
@@ -721,13 +725,33 @@ public interface KStream<K, V> {
      *                    be used
      * @param topic       the topic name
      * @return a {@code KStream} that contains the exact same (and potentially repartitioned)
records as this {@code KStream}
+     * @deprecated use {@code through(String, Produced)}
      */
+    @Deprecated
     KStream<K, V> through(final Serde<K> keySerde,
                           final Serde<V> valSerde,
                           final StreamPartitioner<? super K, ? super V> partitioner,
                           final String topic);
 
     /**
+     * Materialize this stream to a topic and creates a new {@code KStream} from the topic
using the
+     * {@link Produced} instance for configuration of the {@link Serde key serde}, {@link
Serde value serde},
+     * and {@link StreamPartitioner}.
+     * The specified topic should be manually created before it is used (i.e., before the
Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde,
valueSerde)}
+     * and {@link StreamsBuilder#stream(Serde, Serde, String...)
+     * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+     *
+     * @param topic
+     * @param produced
+     * @return a {@code KStream} that contains the exact same (and potentially repartitioned)
records as this {@code KStream}
+     */
+    KStream<K, V> through(final String topic,
+                          final Produced<K, V> produced);
+
+    /**
      * Materialize this stream to a topic using default serializers specified in the config
and producer's
      * {@link DefaultPartitioner}.
      * The specified topic should be manually created before it is used (i.e., before the
Kafka Streams application is
@@ -746,7 +770,9 @@ public interface KStream<K, V> {
      * @param partitioner the function used to determine how records are distributed among
partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be
used
      * @param topic       the topic name
+     * @deprecated use {@code to(String, Produced}
      */
+    @Deprecated
     void to(final StreamPartitioner<? super K, ? super V> partitioner,
             final String topic);
 
@@ -762,7 +788,9 @@ public interface KStream<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be
used
      * @param topic    the topic name
+     * @deprecated use {@code to(String, Produced}
      */
+    @Deprecated
     void to(final Serde<K> keySerde,
             final Serde<V> valSerde,
             final String topic);
@@ -782,13 +810,26 @@ public interface KStream<K, V> {
      *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise
{@link DefaultPartitioner} will
      *                    be used
      * @param topic       the topic name
+     * @deprecated use {@code to(String, Produced}
      */
+    @Deprecated
     void to(final Serde<K> keySerde,
             final Serde<V> valSerde,
             final StreamPartitioner<? super K, ? super V> partitioner,
             final String topic);
 
     /**
+     * Materialize this stream to a topic using the provided {@link Produced} instance.
+     * The specified topic should be manually created before it is used (i.e., before the
Kafka Streams application is
+     * started).
+     *
+     * @param produced    the options to use when producing to the topic
+     * @param topic       the topic name
+     */
+    void to(final String topic,
+            final Produced<K, V> produced);
+
+    /**
      * Transform each record of the input stream into zero or more records in the output
stream (both key and value type
      * can be altered arbitrarily).
      * A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied
to each input record and

http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
new file mode 100644
index 0000000..488bd15
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
+import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+/**
+ * This class is used to provide the optional parameters when producing to new topics
+ * using {@link KStream#through(String, Produced)} or {@link KStream#to(String, Produced)}.
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class Produced<K, V> {
+
+    private Serde<K> keySerde;
+    private Serde<V> valueSerde;
+    private StreamPartitioner<? super K, ? super V> partitioner;
+
+    private Produced(final Serde<K> keySerde,
+                     final Serde<V> valueSerde,
+                     final StreamPartitioner<? super K, ? super V> partitioner) {
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.partitioner = partitioner;
+    }
+
+    /**
+     * Create a Produced instance with provided keySerde and valueSerde.
+     * @param keySerde      Serde to use for serializing the key
+     * @param valueSerde    Serde to use for serializing the value
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return  A new {@link Produced} instance configured with keySerde and valueSerde
+     * @see KStream#through(String, Produced)
+     * @see KStream#to(String, Produced)
+     */
+    public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
+                                             final Serde<V> valueSerde) {
+        return new Produced<>(keySerde, valueSerde, null);
+    }
+
+    /**
+     * Create a Produced instance with provided keySerde, valueSerde, and partitioner.
+     * @param keySerde      Serde to use for serializing the key
+     * @param valueSerde    Serde to use for serializing the value
+     * @param partitioner   the function used to determine how records are distributed among
partitions of the topic,
+     *                      if not specified and {@code keySerde} provides a {@link WindowedSerializer}
for the key
+     *                      {@link WindowedStreamPartitioner} will be used&mdash;otherwise
{@link DefaultPartitioner} wil be used
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return  A new {@link Produced} instance configured with keySerde, valueSerde, and
partitioner
+     * @see KStream#through(String, Produced)
+     * @see KStream#to(String, Produced)
+     */
+    public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
+                                             final Serde<V> valueSerde,
+                                             final StreamPartitioner<? super K, ? super
V> partitioner) {
+        return new Produced<>(keySerde, valueSerde, partitioner);
+    }
+
+    /**
+     * Create a Produced instance with provided keySerde.
+     * @param keySerde      Serde to use for serializing the key
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return  A new {@link Produced} instance configured with keySerde
+     * @see KStream#through(String, Produced)
+     * @see KStream#to(String, Produced)
+     */
+    public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde)
{
+        return new Produced<>(keySerde, null, null);
+    }
+
+    /**
+     * Create a Produced instance with provided valueSerde.
+     * @param valueSerde    Serde to use for serializing the key
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return  A new {@link Produced} instance configured with valueSerde
+     * @see KStream#through(String, Produced)
+     * @see KStream#to(String, Produced)
+     */
+    public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde)
{
+        return new Produced<>(null, valueSerde, null);
+    }
+
+    /**
+     * Create a Produced instance with provided partitioner.
+     * @param partitioner   the function used to determine how records are distributed among
partitions of the topic,
+     *                      if not specified and the key serde provides a {@link WindowedSerializer}
for the key
+     *                      {@link WindowedStreamPartitioner} will be used&mdash;otherwise
{@link DefaultPartitioner} will be used
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return  A new {@link Produced} instance configured with partitioner
+     * @see KStream#through(String, Produced)
+     * @see KStream#to(String, Produced)
+     */
+    public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<?
super K, ? super V> partitioner) {
+        return new Produced<>(null, null, partitioner);
+    }
+
+    /**
+     * Produce records using the provided partitioner.
+     * @param partitioner   the function used to determine how records are distributed among
partitions of the topic,
+     *                      if not specified and the key serde provides a {@link WindowedSerializer}
for the key
+     *                      {@link WindowedStreamPartitioner} will be used&mdash;otherwise
{@link DefaultPartitioner} wil be used
+     * @return this
+     */
+    public Produced<K, V> withStreamPartitioner(final StreamPartitioner<? super
K, ? super V> partitioner) {
+        this.partitioner = partitioner;
+        return this;
+    }
+
+    /**
+     * Produce records using the provided valueSerde.
+     * @param valueSerde    Serde to use for serializing the value
+     * @return this
+     */
+    public Produced<K, V> withValueSerde(final Serde<V> valueSerde) {
+        this.valueSerde = valueSerde;
+        return this;
+    }
+
+    /**
+     * Produce records using the provided keySerde.
+     * @param keySerde    Serde to use for serializing the key
+     * @return this
+     */
+    public Produced<K, V> withKeySerde(final Serde<K> keySerde) {
+        this.keySerde = keySerde;
+        return this;
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    public StreamPartitioner<? super K, ? super V>  streamPartitioner() {
+        return partitioner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8534da8..8aa7c58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -379,9 +380,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
     public KStream<K, V> through(final Serde<K> keySerde,
                                  final Serde<V> valSerde,
                                  final StreamPartitioner<? super K, ? super V> partitioner,
String topic) {
-        to(keySerde, valSerde, partitioner, topic);
 
-        return builder.stream(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic);
+        return through(topic, Produced.with(keySerde, valSerde, partitioner));
+    }
+
+    @Override
+    public KStream<K, V> through(final String topic, final Produced<K, V> produced)
{
+        to(topic, produced);
+        return builder.stream(null, new FailOnInvalidTimestamp(), produced.keySerde(), produced.valueSerde(),
topic);
     }
 
     @Override
@@ -406,13 +412,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
     public KStream<K, V> through(final Serde<K> keySerde,
                                  final Serde<V> valSerde,
                                  final String topic) {
-        return through(keySerde, valSerde, null, topic);
+        return through(topic, Produced.with(keySerde, valSerde));
     }
 
     @Override
     public KStream<K, V> through(final StreamPartitioner<? super K, ? super V>
partitioner,
                                  final String topic) {
-        return through(null, null, partitioner, topic);
+        return through(topic, Produced.streamPartitioner(partitioner));
     }
 
     @Override
@@ -422,20 +428,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
 
     @Override
     public void to(final String topic) {
-        to(null, null, null, topic);
+        to(topic, Produced.<K, V>with(null, null, null));
     }
 
     @Override
     public void to(final StreamPartitioner<? super K, ? super V> partitioner,
                    final String topic) {
-        to(null, null, partitioner, topic);
+        to(topic, Produced.streamPartitioner(partitioner));
     }
 
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
                    final String topic) {
-        to(keySerde, valSerde, null, topic);
+        to(topic, Produced.with(keySerde, valSerde));
     }
 
     @SuppressWarnings("unchecked")
@@ -445,10 +451,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
                    final StreamPartitioner<? super K, ? super V> partitioner,
                    final String topic) {
         Objects.requireNonNull(topic, "topic can't be null");
-        final String name = builder.newName(SINK_NAME);
+        to(topic, Produced.with(keySerde, valSerde, partitioner));
+    }
 
-        final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
-        final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
+    @SuppressWarnings("unchecked")
+    @Override
+    public void to(final String topic, final Produced<K, V> produced) {
+        Objects.requireNonNull(topic, "topic can't be null");
+        Objects.requireNonNull(produced, "Produced can't be null");
+        final String name = builder.newName(SINK_NAME);
+        final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
+        final Serializer<V> valSerializer = produced.valueSerde() == null ? null :
produced.valueSerde().serializer();
+        final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
 
         if (partitioner == null && keySerializer != null && keySerializer
instanceof WindowedSerializer) {
             final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>)
keySerializer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1fed374..ca454f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -30,20 +30,24 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -57,6 +61,9 @@ public class KStreamImplTest {
     private KStream<String, String> testStream;
     private StreamsBuilder builder;
 
+    @Rule
+    public final KStreamTestDriver driver = new KStreamTestDriver();
+
     @Before
     public void before() {
         builder = new StreamsBuilder();
@@ -179,6 +186,33 @@ public class KStreamImplTest {
     }
 
     @Test
+    public void shouldSendDataThroughTopicUsingProduced() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String input = "topic";
+        final KStream<String, String> stream = builder.stream(stringSerde, stringSerde,
input);
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
+
+        driver.setUp(builder);
+        driver.process(input, "a", "b");
+        assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b")));
+    }
+
+    @Test
+    public void shouldSendDataToTopicUsingProduced() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String input = "topic";
+        final KStream<String, String> stream = builder.stream(stringSerde, stringSerde,
input);
+        final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+        stream.to("to-topic", Produced.with(stringSerde, stringSerde));
+        builder.stream(stringSerde, stringSerde, "to-topic").process(processorSupplier);
+
+        driver.setUp(builder);
+        driver.process(input, "e", "f");
+        assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f")));
+    }
+
+    @Test
     // TODO: this test should be refactored when we removed KStreamBuilder so that the created
Topology contains internal topics as well
     public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated()
{
         final KStreamBuilder builder = new KStreamBuilder();
@@ -376,6 +410,18 @@ public class KStreamImplTest {
                         null);
     }
 
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
+        testStream.through("topic", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnToWhenProducedIsNull() {
+        testStream.to("topic", null);
+    }
+
+
     @Test
     public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
         final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(),
"blah");
@@ -411,4 +457,5 @@ public class KStreamImplTest {
     public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
         testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10),
null);
     }
+
 }


Mime
View raw message