kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3936: Validate parameters as early as possible
Date Tue, 09 Aug 2016 18:31:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8abcece40 -> caa9bd0fc


KAFKA-3936: Validate parameters as early as possible

Added non null checks to parameters supplied via the DSL and `TopologyBuilder`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Edward Ribeiro <edward.ribeiro@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #1711 from dguy/kafka-3936


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/caa9bd0f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/caa9bd0f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/caa9bd0f

Branch: refs/heads/trunk
Commit: caa9bd0fcd2fab4758791408e2b145532153910e
Parents: 8abcece
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Aug 9 11:31:04 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Aug 9 11:31:04 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/KGroupedStreamImpl.java   |  13 ++
 .../kstream/internals/KGroupedTableImpl.java    |   8 ++
 .../streams/kstream/internals/KStreamImpl.java  |  26 ++++
 .../streams/kstream/internals/KTableImpl.java   |  15 +++
 .../streams/processor/TopologyBuilder.java      |  21 +++-
 .../streams/kstream/KStreamBuilderTest.java     |  11 ++
 .../internals/KGroupedStreamImplTest.java       | 100 +++++++++++++++
 .../internals/KGroupedTableImplTest.java        |  76 ++++++++++++
 .../kstream/internals/KStreamImplTest.java      | 122 +++++++++++++++++++
 .../kstream/internals/KTableImplTest.java       |  86 +++++++++++++
 .../streams/processor/TopologyBuilderTest.java  |  65 ++++++++++
 .../StreamThreadStateStoreProviderTest.java     |   2 +-
 12 files changed, 540 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 51fd116..78e6a2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 
 import java.util.Collections;
+import java.util.Objects;
 import java.util.Set;
 
 public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
@@ -55,6 +56,8 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
     @Override
     public KTable<K, V> reduce(final Reducer<V> reducer,
                                final String storeName) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
         return doAggregate(
             new KStreamReduce<K, V>(storeName, reducer),
             REDUCE_NAME,
@@ -67,6 +70,9 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
     public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
                                                             Windows<W> windows,
                                                             final String storeName) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(windows, "windows can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
         return (KTable<Windowed<K>, V>) doAggregate(
             new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
             REDUCE_NAME,
@@ -79,6 +85,9 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
                                       final Aggregator<K, V, T> aggregator,
                                       final Serde<T> aggValueSerde,
                                       final String storeName) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
         return doAggregate(
             new KStreamAggregate<>(storeName, initializer, aggregator),
             AGGREGATE_NAME,
@@ -92,6 +101,10 @@ public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGrou
                                                                   final Windows<W> windows,
                                                                   final Serde<T> aggValueSerde,
                                                                   final String storeName) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(windows, "windows can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
         return (KTable<Windowed<K>, T>) doAggregate(
             new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
             AGGREGATE_NAME,

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 5039c04..6d6e8cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 
 import java.util.Collections;
+import java.util.Objects;
 
 /**
  * The implementation class of {@link KGroupedTable}.
@@ -65,6 +66,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
                                       Serde<T> aggValueSerde,
                                       String storeName) {
 
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(adder, "adder can't be null");
+        Objects.requireNonNull(subtractor, "subtractor can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
         ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(storeName, initializer, adder, subtractor);
         return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, storeName);
     }
@@ -121,6 +126,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
     public KTable<K, V> reduce(Reducer<V> adder,
                                Reducer<V> subtractor,
                                String storeName) {
+        Objects.requireNonNull(adder, "adder can't be null");
+        Objects.requireNonNull(subtractor, "subtractor can't be null");
+        Objects.requireNonNull(storeName, "storeName can't be null");
         ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(storeName, adder, subtractor);
         return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, storeName);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1859503..4d39b18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -44,6 +44,7 @@ import java.io.PrintStream;
 import java.lang.reflect.Array;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
@@ -104,6 +105,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public KStream<K, V> filter(Predicate<K, V> predicate) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
@@ -113,6 +115,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public KStream<K, V> filterNot(final Predicate<K, V> predicate) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
@@ -123,6 +126,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     @SuppressWarnings("unchecked")
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> mapper) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
         return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true);
     }
 
@@ -139,6 +143,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(MAP_NAME);
 
         topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
@@ -149,6 +154,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(MAPVALUES_NAME);
 
         topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
@@ -200,6 +206,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
      */
     @Override
     public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
+        Objects.requireNonNull(filePath, "filePath can't be null");
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
         try {
@@ -216,6 +223,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(FLATMAP_NAME);
 
         topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
@@ -225,6 +233,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
         String name = topology.newName(FLATMAPVALUES_NAME);
 
         topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
@@ -235,6 +244,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     @SuppressWarnings("unchecked")
     public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
+        if (predicates.length == 0) {
+            throw new IllegalArgumentException("you must provide at least one predicate");
+        }
+        for (Predicate<K, V> predicate : predicates) {
+            Objects.requireNonNull(predicate, "predicates can't have null values");
+        }
         String branchName = topology.newName(BRANCH_NAME);
 
         topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
@@ -283,6 +298,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public void foreach(ForeachAction<K, V> action) {
+        Objects.requireNonNull(action, "action can't be null");
         String name = topology.newName(FOREACH_NAME);
 
         topology.addProcessor(name, new KStreamForeach<>(action), this.name);
@@ -321,6 +337,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @SuppressWarnings("unchecked")
     @Override
     public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+        Objects.requireNonNull(topic, "topic can't be null");
         String name = topology.newName(SINK_NAME);
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
@@ -336,6 +353,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
+        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
         String name = topology.newName(TRANSFORM_NAME);
 
         topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
@@ -346,6 +364,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
+        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
         String name = topology.newName(TRANSFORMVALUES_NAME);
 
         topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
@@ -430,6 +449,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                          Serde<V> thisValueSerde,
                                          Serde<V1> otherValueSerde,
                                          KStreamImplJoin join) {
+        Objects.requireNonNull(other, "other KStream can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        Objects.requireNonNull(windows, "windows can't be null");
+
         KStreamImpl<K, V> joinThis = this;
         KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
 
@@ -541,6 +564,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                           ValueJoiner<V, V1, R> joiner,
                                           Serde<K> keySerde,
                                           Serde<V> valueSerde) {
+        Objects.requireNonNull(other, "other KTable can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
 
         if (repartitionRequired) {
             KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
@@ -574,6 +599,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                    Serde<K1> keySerde,
                                                    Serde<V> valSerde) {
 
+        Objects.requireNonNull(selector, "selector can't be null");
         String selectName = internalSelectKey(selector);
         return new KGroupedStreamImpl<>(topology,
                                         selectName,

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index f4d4855..2f36183 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -113,6 +114,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public KTable<K, V> filter(Predicate<K, V> predicate) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
         topology.addProcessor(name, processorSupplier, this.name);
@@ -122,6 +124,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public KTable<K, V> filterNot(final Predicate<K, V> predicate) {
+        Objects.requireNonNull(predicate, "predicate can't be null");
         String name = topology.newName(FILTER_NAME);
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
 
@@ -132,6 +135,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
+        Objects.requireNonNull(mapper);
         String name = topology.newName(MAPVALUES_NAME);
         KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
 
@@ -196,6 +200,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
     @Override
     public void foreach(final ForeachAction<K, V> action) {
+        Objects.requireNonNull(action, "action can't be null");
         String name = topology.newName(FOREACH_NAME);
         KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
             @Override
@@ -212,6 +217,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                 StreamPartitioner<K, V> partitioner,
                                 String topic,
                                 final String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
         to(keySerde, valSerde, partitioner, topic);
 
         return topology.table(keySerde, valSerde, topic, storeName);
@@ -274,6 +280,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         String joinThisName = topology.newName(JOINTHIS_NAME);
@@ -297,6 +306,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         String joinThisName = topology.newName(OUTERTHIS_NAME);
@@ -320,6 +332,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
         String joinThisName = topology.newName(LEFTTHIS_NAME);
@@ -345,6 +359,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                                   Serde<K1> keySerde,
                                                   Serde<V1> valueSerde) {
 
+        Objects.requireNonNull(selector, "selector can't be null");
         String selectName = topology.newName(SELECT_NAME);
 
         KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index b8851b4..8e3dc7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -278,10 +279,15 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
     public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+        if (topics.length == 0) {
+            throw new TopologyBuilderException("You must provide at least one topic");
+        }
+        Objects.requireNonNull(name, "name must not be null");
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
         for (String topic : topics) {
+            Objects.requireNonNull(topic, "topic names cannot be null");
             if (sourceTopicNames.contains(topic))
                 throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
 
@@ -322,10 +328,8 @@ public class TopologyBuilder {
      */
 
     public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
-
-        if (topicPattern == null) {
-            throw new TopologyBuilderException("Pattern can't be null");
-        }
+        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
+        Objects.requireNonNull(name, "name can't be null");
 
         if (nodeFactories.containsKey(name)) {
             throw new TopologyBuilderException("Processor " + name + " is already added.");
@@ -435,6 +439,8 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
      */
     public synchronized final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
+        Objects.requireNonNull(name, "name must not be null");
+        Objects.requireNonNull(topic, "topic must not be null");
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -467,6 +473,8 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
      */
     public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
+        Objects.requireNonNull(name, "name must not be null");
+        Objects.requireNonNull(supplier, "supplier must not be null");
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -495,6 +503,7 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if state store supplier is already added
      */
     public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
+        Objects.requireNonNull(supplier, "supplier can't be null");
         if (stateFactories.containsKey(supplier.name())) {
             throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
         }
@@ -528,6 +537,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
+        Objects.requireNonNull(processorName, "processorName can't be null");
         if (stateStoreNames != null) {
             for (String stateStoreName : stateStoreNames) {
                 connectProcessorAndStateStore(processorName, stateStoreName);
@@ -584,6 +594,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public synchronized final TopologyBuilder addInternalTopic(String topicName) {
+        Objects.requireNonNull(topicName, "topicName can't be null");
         this.internalTopicNames.add(topicName);
 
         return this;
@@ -793,6 +804,7 @@ public class TopologyBuilder {
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
     public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId) {
+        Objects.requireNonNull(applicationId, "applicationId can't be null");
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -910,6 +922,7 @@ public class TopologyBuilder {
      * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
      */
     public synchronized void setApplicationId(String applicationId) {
+        Objects.requireNonNull(applicationId, "applicationId can't be null");
         this.applicationId = applicationId;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index cdf28db..c776b8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
@@ -88,4 +89,14 @@ public class KStreamBuilderTest {
         assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
     }
 
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+        new KStreamBuilder().stream();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+        new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
new file mode 100644
index 0000000..a95d1fb
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockReducer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KGroupedStreamImplTest {
+
+    private KGroupedStream<String, String> groupedStream;
+
+    @Before
+    public void before() {
+        final KStream<String, String> stream = new KStreamBuilder().stream("topic");
+        groupedStream = stream.groupByKey();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullReducerOnReduce() throws Exception {
+        groupedStream.reduce(null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreNameOnReduce() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullReducerWithWindowedReduce() throws Exception {
+        groupedStream.reduce(null, TimeWindows.of(10), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullWindowsWithWindowedReduce() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreNameWithWindowedReduce() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
+        groupedStream.aggregate(null, MockAggregator.STRING_ADDER, Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullAdderOnAggregate() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreNameOnAggregate() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, Serdes.String(), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
+        groupedStream.aggregate(null, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullAdderOnWindowedAggregate() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, null, TimeWindows.of(10), Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, null, Serdes.String(), "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotHaveNullStoreNameOnWindowedAggregate() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, TimeWindows.of(10), Serdes.String(), null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
new file mode 100644
index 0000000..0846066
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KGroupedTable;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockKeyValueMapper;
+import org.apache.kafka.test.MockReducer;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class KGroupedTableImplTest {
+
+    private KGroupedTable<String, String> groupedTable;
+
+    @Before
+    public void before() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        groupedTable = builder.table(Serdes.String(), Serdes.String(), "blah", "blah")
+                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreNameOnAggregate() throws Exception {
+        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullInitializerOnAggregate() throws Exception {
+        groupedTable.aggregate(null, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullAdderOnAggregate() throws Exception {
+        groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.STRING_REMOVER, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullSubtractorOnAggregate() throws Exception {
+        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullAdderOnReduce() throws Exception {
+        groupedTable.reduce(null, MockReducer.STRING_REMOVER, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullSubtractorOnReduce() throws Exception {
+        groupedTable.reduce(MockReducer.STRING_ADDER, null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreNameOnReduce() throws Exception {
+        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index d5fc41b..9cbc156 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -37,6 +40,14 @@ public class KStreamImplTest {
 
     final private Serde<String> stringSerde = Serdes.String();
     final private Serde<Integer> intSerde = Serdes.Integer();
+    private KStream<String, String> testStream;
+    private KStreamBuilder builder;
+
+    @Before
+    public void before() {
+        builder = new KStreamBuilder();
+        testStream = builder.stream("source");
+    }
 
     @Test
     public void testNumProcesses() {
@@ -141,4 +152,115 @@ public class KStreamImplTest {
         final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input");
         inputStream.to(stringSerde, null, "output");
     }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullPredicateOnFilter() throws Exception {
+        testStream.filter(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullPredicateOnFilterNot() throws Exception {
+        testStream.filterNot(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnSelectKey() throws Exception {
+        testStream.selectKey(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnMap() throws Exception {
+        testStream.map(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnMapValues() throws Exception {
+        testStream.mapValues(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
+        testStream.writeAsText(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnFlatMap() throws Exception {
+        testStream.flatMap(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnFlatMapValues() throws Exception {
+        testStream.flatMapValues(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldHaveAtLeastOnPredicateWhenBranching() throws Exception {
+        testStream.branch();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldCantHaveNullPredicate() throws Exception {
+        testStream.branch(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicOnThrough() throws Exception {
+        testStream.through(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicOnTo() throws Exception {
+        testStream.to(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTransformSupplierOnTransform() throws Exception {
+        testStream.transform(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTransformSupplierOnTransformValues() throws Exception {
+        testStream.transformValues(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessSupplier() throws Exception {
+        testStream.process(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullOtherStreamOnJoin() throws Exception {
+        testStream.join(null, MockValueJoiner.STRING_JOINER, JoinWindows.of(10));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullValueJoinerOnJoin() throws Exception {
+        testStream.join(testStream, null, JoinWindows.of(10));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullJoinWindowsOnJoin() throws Exception {
+        testStream.join(testStream, MockValueJoiner.STRING_JOINER, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTableOnTableJoin() throws Exception {
+        testStream.leftJoin((KTable) null, MockValueJoiner.STRING_JOINER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullValueMapperOnTableJoin() throws Exception {
+        testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullSelectorOnGroupBy() throws Exception {
+        testStream.groupBy(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullActionOnForEach() throws Exception {
+        testStream.foreach(null);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 617a2a1..7edaac9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -52,6 +53,8 @@ public class KTableImplTest {
 
     private KStreamTestDriver driver = null;
     private File stateDir = null;
+    private KStreamBuilder builder;
+    private KTable<String, String> table;
 
     @After
     public void tearDown() {
@@ -64,6 +67,8 @@ public class KTableImplTest {
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
+        builder = new KStreamBuilder();
+        table = builder.table("test", "test");
     }
 
     @Test
@@ -356,4 +361,85 @@ public class KTableImplTest {
         assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
         assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
     }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullSelectorOnToStream() throws Exception {
+        table.toStream(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicOnTo() throws Exception {
+        table.to(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullPredicateOnFilter() throws Exception {
+        table.filter(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullPredicateOnFilterNot() throws Exception {
+        table.filterNot(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullMapperOnMapValues() throws Exception {
+        table.mapValues(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
+        table.writeAsText(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullActionOnForEach() throws Exception {
+        table.foreach(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicInThrough() throws Exception {
+        table.through(null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullStoreInThrough() throws Exception {
+        table.through("topic", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullSelectorOnGroupBy() throws Exception {
+        table.groupBy(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullOtherTableOnJoin() throws Exception {
+        table.join(null, MockValueJoiner.STRING_JOINER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullJoinerJoin() throws Exception {
+        table.join(table, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullOtherTableOnOuterJoin() throws Exception {
+        table.outerJoin(null, MockValueJoiner.STRING_JOINER);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullJoinerOnOuterJoin() throws Exception {
+        table.outerJoin(table, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullJoinerOnLeftJoin() throws Exception {
+        table.leftJoin(table, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullOtherTableOnLeftJoin() throws Exception {
+        table.leftJoin(null, MockValueJoiner.STRING_JOINER);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 107d832..f6ca6db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -331,6 +331,71 @@ public class TopologyBuilderTest {
         assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSink(null, "topic");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSink("name", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addProcessor(null, new ProcessorSupplier() {
+            @Override
+            public Processor get() {
+                return null;
+            }
+        });
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorSupplier() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addProcessor("name", null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(null, Pattern.compile(".*"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.connectProcessorAndStateStores(null, "store");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAddNullInternalTopic() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addInternalTopic(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullApplicationIdOnBuild() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.build(null, 1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotSetApplicationIdToNull() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAddNullStateStoreSupplier() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addStateStore(null, true);
+    }
+
     private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
         Set<String> nodeNames = new HashSet<>();
         for (ProcessorNode node : nodes) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/caa9bd0f/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 6563e26..a112a5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -65,7 +65,7 @@ public class StreamThreadStateStoreProviderTest {
     @Before
     public void before() throws IOException {
         final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("the-source");
+        builder.addSource("the-source", "the-source");
         builder.addProcessor("the-processor", new MockProcessorSupplier());
         builder.addStateStore(Stores.create("kv-store")
                                   .withStringKeys()


Mime
View raw message