kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6958: Allow to name operation using parameter classes (#6410)
Date Thu, 18 Apr 2019 22:45:46 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 075b368  KAFKA-6958: Allow to name operation using parameter classes (#6410)
075b368 is described below

commit 075b368d47ad3144518e4d425f9155e35f15f418
Author: Florian Hussonnois <fhussonnois@gmail.com>
AuthorDate: Fri Apr 19 00:45:34 2019 +0200

    KAFKA-6958: Allow to name operation using parameter classes (#6410)
    
    This is the 2nd PR for the KIP-307
    Reviewers: Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |  16 +++-
 .../org/apache/kafka/streams/kstream/Consumed.java |  46 +++++++--
 .../org/apache/kafka/streams/kstream/Grouped.java  |   7 +-
 .../apache/kafka/streams/kstream/Materialized.java |   3 +-
 .../org/apache/kafka/streams/kstream/Named.java    |  83 ++++++++++++++++
 .../org/apache/kafka/streams/kstream/Printed.java  |  16 +++-
 .../org/apache/kafka/streams/kstream/Produced.java |  36 +++++--
 .../kstream/internals/ConsumedInternal.java        |   5 +
 .../kstream/internals/InternalNameProvider.java    |   4 +-
 .../kstream/internals/InternalStreamsBuilder.java  |   9 +-
 .../streams/kstream/internals/KStreamImpl.java     |  16 ++--
 .../streams/kstream/internals/NamedInternal.java   |  81 ++++++++++++++++
 .../streams/kstream/internals/PrintedInternal.java |   4 +
 .../kstream/internals/ProducedInternal.java        |   6 ++
 .../internals/suppress/SuppressedInternal.java     |  13 +--
 .../apache/kafka/streams/StreamsBuilderTest.java   | 105 ++++++++++++++++++---
 .../kafka/streams/kstream/MaterializedTest.java    |   4 +-
 .../apache/kafka/streams/kstream/NamedTest.java    |  48 ++++++++++
 .../kstream/internals/KGroupedStreamImplTest.java  |  14 +--
 .../kstream/internals/KGroupedTableImplTest.java   |   6 +-
 .../kstream/internals/NamedInternalTest.java       |  66 +++++++++++++
 21 files changed, 522 insertions(+), 66 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 9e89d7a..abb5aa1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -229,8 +229,10 @@ public class StreamsBuilder {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
+
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
             new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
+
         return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
     }
 
@@ -280,8 +282,12 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
+
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
-            new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
+            new MaterializedInternal<>(
+                    Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
+                    internalStreamsBuilder, topic + "-");
+
         return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
     }
 
@@ -307,8 +313,10 @@ public class StreamsBuilder {
                                                   final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
+
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
             new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
+
         final ConsumedInternal<K, V> consumedInternal =
                 new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde()));
 
@@ -336,8 +344,11 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
+
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
+            new MaterializedInternal<>(
+                Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
+                internalStreamsBuilder, topic + "-");
 
         return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
     }
@@ -403,6 +414,7 @@ public class StreamsBuilder {
         final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
         // always use the serdes from consumed
         materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
+
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
             new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
index 667b621..423ca60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
@@ -50,21 +50,24 @@ import java.util.Objects;
  * @param <K> type of record key
  * @param <V> type of record value
  */
-public class Consumed<K, V> {
+public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
 
     protected Serde<K> keySerde;
     protected Serde<V> valueSerde;
     protected TimestampExtractor timestampExtractor;
     protected Topology.AutoOffsetReset resetPolicy;
+    protected String processorName;
 
     private Consumed(final Serde<K> keySerde,
                      final Serde<V> valueSerde,
                      final TimestampExtractor timestampExtractor,
-                     final Topology.AutoOffsetReset resetPolicy) {
+                     final Topology.AutoOffsetReset resetPolicy,
+                     final String processorName) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.timestampExtractor = timestampExtractor;
         this.resetPolicy = resetPolicy;
+        this.processorName = processorName;
     }
 
     /**
@@ -72,7 +75,12 @@ public class Consumed<K, V> {
      * @param consumed  the instance of {@link Consumed} to copy
      */
     protected Consumed(final Consumed<K, V> consumed) {
-        this(consumed.keySerde, consumed.valueSerde, consumed.timestampExtractor, consumed.resetPolicy);
+        this(consumed.keySerde,
+             consumed.valueSerde,
+             consumed.timestampExtractor,
+             consumed.resetPolicy,
+             consumed.processorName
+        );
     }
 
     /**
@@ -90,7 +98,7 @@ public class Consumed<K, V> {
                                              final Serde<V> valueSerde,
                                              final TimestampExtractor timestampExtractor,
                                              final Topology.AutoOffsetReset resetPolicy) {
-        return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy);
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null);
 
     }
 
@@ -105,7 +113,7 @@ public class Consumed<K, V> {
      */
     public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
                                              final Serde<V> valueSerde) {
-        return new Consumed<>(keySerde, valueSerde, null, null);
+        return new Consumed<>(keySerde, valueSerde, null, null, null);
     }
 
     /**
@@ -117,7 +125,7 @@ public class Consumed<K, V> {
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
-        return new Consumed<>(null, null, timestampExtractor, null);
+        return new Consumed<>(null, null, timestampExtractor, null, null);
     }
 
     /**
@@ -129,7 +137,19 @@ public class Consumed<K, V> {
      * @return a new instance of {@link Consumed}
      */
     public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
-        return new Consumed<>(null, null, null, resetPolicy);
+        return new Consumed<>(null, null, null, resetPolicy, null);
+    }
+
+    /**
+     * Create an instance of {@link Consumed} with provided processor name.
+     *
+     * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
+     * @param <K>         key type
+     * @param <V>         value type
+     * @return a new instance of {@link Consumed}
+     */
+    public static <K, V> Consumed<K, V> as(final String processorName) {
+        return new Consumed<>(null, null, null, null, processorName);
     }
 
     /**
@@ -176,6 +196,18 @@ public class Consumed<K, V> {
         return this;
     }
 
+    /**
+     * Configure the instance of {@link Consumed} with a processor name.
+     *
+     * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
+     * @return this
+     */
+    @Override
+    public Consumed<K, V> withName(final String processorName) {
+        this.processorName = processorName;
+        return this;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
index 3380fc8..c196d93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
@@ -29,9 +29,9 @@ import org.apache.kafka.common.serialization.Serde;
  * @param <K> the key type
  * @param <V> the value type
  */
-public class Grouped<K, V> {
+public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
 
-    protected final  Serde<K> keySerde;
+    protected final Serde<K> keySerde;
     protected final Serde<V> valueSerde;
     protected final String name;
 
@@ -128,9 +128,10 @@ public class Grouped<K, V> {
      * Perform the grouping operation with the name for a repartition topic if required.  Note
      * that Kafka Streams does not always create repartition topics for grouping operations.
      *
-     * @param name the name used as part of the repartition topic name if required
+     * @param name the name used for the processor name and as part of the repartition topic name if required
      * @return a new {@link Grouped} instance configured with the name
      * */
+    @Override
     public Grouped<K, V> withName(final String name) {
         return new Grouped<>(name, keySerde, valueSerde);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 15bdf92..3804932 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.internals.ApiUtils;
@@ -100,7 +99,7 @@ public class Materialized<K, V, S extends StateStore> {
      * @return a new {@link Materialized} instance with the given storeName
      */
     public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
-        Topic.validate(storeName);
+        Named.validate(storeName);
         return new Materialized<>(storeName);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java
new file mode 100644
index 0000000..1db031a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.errors.TopologyException;
+
+import java.util.Objects;
+
+public class Named implements NamedOperation<Named> {
+
+    private static final int MAX_NAME_LENGTH = 249;
+
+    protected String name;
+
+    protected Named(final String name) {
+        this.name = name;
+        if (name != null) {
+            validate(name);
+        }
+    }
+
+    /**
+     * Create a Named instance with provided name.
+     *
+     * @param name  the processor name to be used. If {@code null} a default processor name will be generated.
+     * @return      A new {@link Named} instance configured with name
+     *
+     * @throws TopologyException if an invalid name is specified; valid characters are ASCII alphanumerics, '.', '_' and '-'.
+     */
+    public static Named as(final String name) {
+        Objects.requireNonNull(name, "name can't be null");
+        return new Named(name);
+    }
+
+    @Override
+    public Named withName(final String name) {
+        return new Named(name);
+    }
+
+    static void validate(final String name) {
+        if (name.isEmpty())
+            throw new TopologyException("Name is illegal, it can't be empty");
+        if (name.equals(".") || name.equals(".."))
+            throw new TopologyException("Name cannot be \".\" or \"..\"");
+        if (name.length() > MAX_NAME_LENGTH)
+            throw new TopologyException("Name is illegal, it can't be longer than " + MAX_NAME_LENGTH +
+                    " characters, name: " + name);
+        if (!containsValidPattern(name))
+            throw new TopologyException("Name \"" + name + "\" is illegal, it contains a character other than " +
+                    "ASCII alphanumerics, '.', '_' and '-'");
+    }
+
+    /**
+     * Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-'
+     */
+    private static boolean containsValidPattern(final String topic) {
+        for (int i = 0; i < topic.length(); ++i) {
+            final char c = topic.charAt(i);
+
+            // We don't use Character.isLetterOrDigit(c) because it's slower
+            final boolean validLetterOrDigit = (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z');
+            final boolean validChar = validLetterOrDigit || c == '.' || c == '_' || c == '-';
+            if (!validChar) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index 5002efc..53c00c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -31,9 +31,10 @@ import java.util.Objects;
  * @param <V> value type
  * @see KStream#print(Printed)
  */
-public class Printed<K, V> {
+public class Printed<K, V> implements NamedOperation<Printed<K, V>> {
     protected final OutputStream outputStream;
     protected String label;
+    protected String processorName;
     protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K, V, String>() {
         @Override
         public String apply(final K key, final V value) {
@@ -53,6 +54,7 @@ public class Printed<K, V> {
         this.outputStream = printed.outputStream;
         this.label = printed.label;
         this.mapper = printed.mapper;
+        this.processorName = printed.processorName;
     }
 
     /**
@@ -122,4 +124,16 @@ public class Printed<K, V> {
         this.mapper = mapper;
         return this;
     }
+
+    /**
+     * Print the records of a {@link KStream} with provided processor name.
+     *
+     * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
+     ** @return this
+     */
+    @Override
+    public Printed<K, V> withName(final String processorName) {
+        this.processorName = processorName;
+        return this;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index a3d96bd..6c3ed8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -30,24 +30,28 @@ import java.util.Objects;
  * @param <K> key type
  * @param <V> value type
  */
-public class Produced<K, V> {
+public class Produced<K, V> implements NamedOperation<Produced<K, V>> {
 
     protected Serde<K> keySerde;
     protected Serde<V> valueSerde;
     protected StreamPartitioner<? super K, ? super V> partitioner;
+    protected String processorName;
 
     private Produced(final Serde<K> keySerde,
                      final Serde<V> valueSerde,
-                     final StreamPartitioner<? super K, ? super V> partitioner) {
+                     final StreamPartitioner<? super K, ? super V> partitioner,
+                     final String processorName) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.partitioner = partitioner;
+        this.processorName = processorName;
     }
 
     protected Produced(final Produced<K, V> produced) {
         this.keySerde = produced.keySerde;
         this.valueSerde = produced.valueSerde;
         this.partitioner = produced.partitioner;
+        this.processorName = produced.processorName;
     }
 
     /**
@@ -62,7 +66,7 @@ public class Produced<K, V> {
      */
     public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
                                              final Serde<V> valueSerde) {
-        return new Produced<>(keySerde, valueSerde, null);
+        return new Produced<>(keySerde, valueSerde, null, null);
     }
 
     /**
@@ -82,7 +86,19 @@ public class Produced<K, V> {
     public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
                                              final Serde<V> valueSerde,
                                              final StreamPartitioner<? super K, ? super V> partitioner) {
-        return new Produced<>(keySerde, valueSerde, partitioner);
+        return new Produced<>(keySerde, valueSerde, partitioner, null);
+    }
+
+    /**
+     * Create an instance of {@link Produced} with provided processor name.
+     *
+     * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
+     * @param <K>         key type
+     * @param <V>         value type
+     * @return a new instance of {@link Produced}
+     */
+    public static <K, V> Produced<K, V> as(final String processorName) {
+        return new Produced<>(null, null, null, processorName);
     }
 
     /**
@@ -95,7 +111,7 @@ public class Produced<K, V> {
      * @see KStream#to(String, Produced)
      */
     public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde) {
-        return new Produced<>(keySerde, null, null);
+        return new Produced<>(keySerde, null, null, null);
     }
 
     /**
@@ -108,7 +124,7 @@ public class Produced<K, V> {
      * @see KStream#to(String, Produced)
      */
     public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde) {
-        return new Produced<>(null, valueSerde, null);
+        return new Produced<>(null, valueSerde, null, null);
     }
 
     /**
@@ -123,7 +139,7 @@ public class Produced<K, V> {
      * @see KStream#to(String, Produced)
      */
     public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
-        return new Produced<>(null, null, partitioner);
+        return new Produced<>(null, null, partitioner, null);
     }
 
     /**
@@ -176,4 +192,10 @@ public class Produced<K, V> {
     public int hashCode() {
         return Objects.hash(keySerde, valueSerde, partitioner);
     }
+
+    @Override
+    public Produced<K, V> withName(final String name) {
+        this.processorName = name;
+        return this;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
index f33b0d8..94ceaff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 public class ConsumedInternal<K, V> extends Consumed<K, V> {
+
     public ConsumedInternal(final Consumed<K, V> consumed) {
         super(consumed);
     }
@@ -62,4 +63,8 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
     public Topology.AutoOffsetReset offsetResetPolicy() {
         return resetPolicy;
     }
+
+    public String name() {
+        return processorName;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
index 8d8ebfc..bc35d68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 public interface InternalNameProvider {
-    String newProcessorName(String prefix);
+    String newProcessorName(final String prefix);
 
-    String newStoreName(String prefix);
+    String newStoreName(final String prefix);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 920f213..960b030 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -79,7 +79,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
 
     public <K, V> KStream<K, V> stream(final Collection<String> topics,
                                        final ConsumedInternal<K, V> consumed) {
-        final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
+
+        final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
         final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed);
 
         addGraphNode(root, streamSourceNode);
@@ -112,8 +113,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
-        final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
-        final String tableSourceName = newProcessorName(KTableImpl.SOURCE_NAME);
+        final String sourceName = new NamedInternal(consumed.name())
+                .orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
+        final String tableSourceName = new NamedInternal(consumed.name())
+                .suffixWithOrElseGet("-table-source", () -> newProcessorName(KTableImpl.SOURCE_NAME));
         final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 75df058..e32db69 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -168,7 +168,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
 
-        final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
+        final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper, NamedInternal.empty());
 
         selectKeyProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
@@ -178,9 +178,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
 
-    private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
-        final String name = builder.newProcessorName(KEY_SELECT_NAME);
-
+    private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
+                                                            final NamedInternal named) {
+        final String name = named.orElseGenerateWithPrefix(builder, KEY_SELECT_NAME);
         final KStreamMap<K, V, KR, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
 
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
@@ -241,8 +241,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     public void print(final Printed<K, V> printed) {
         Objects.requireNonNull(printed, "printed can't be null");
         final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
-        final String name = builder.newProcessorName(PRINTING_NAME);
-
+        final String name = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(builder, PRINTING_NAME);
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name);
         final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, processorParameters);
 
@@ -428,8 +427,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
     }
 
     private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
-        final String name = builder.newProcessorName(SINK_NAME);
-
+        final String name = new NamedInternal(produced.name()).orElseGenerateWithPrefix(builder, SINK_NAME);
         final StreamSinkNode<K, V> sinkNode = new StreamSinkNode<>(
             name,
             topicExtractor,
@@ -852,7 +850,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
         Objects.requireNonNull(selector, "selector can't be null");
         Objects.requireNonNull(grouped, "grouped can't be null");
         final GroupedInternal<KR, V> groupedInternal = new GroupedInternal<>(grouped);
-        final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(selector);
+        final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(selector, new NamedInternal(groupedInternal.name()));
         selectKeyMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
new file mode 100644
index 0000000..e83728e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Named;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public class NamedInternal extends Named {
+
+    public static NamedInternal empty() {
+        return new NamedInternal(null);
+    }
+
+    public static NamedInternal with(final String name) {
+        return new NamedInternal(name);
+    }
+
+    /**
+     * Creates a new {@link NamedInternal} instance.
+     *
+     * @param internal  the internal name.
+     */
+    NamedInternal(final String internal) {
+        super(internal);
+    }
+
+    /**
+     * @return  a string name.
+     */
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public NamedInternal withName(final String name) {
+        return new NamedInternal(name);
+    }
+
+    /**
+     * Check whether an internal name is defined.
+     * @return {@code false} if no name is set.
+     */
+    public boolean isDefined() {
+        return name != null;
+    }
+
+    String suffixWithOrElseGet(final String suffix, final Supplier<String> supplier) {
+        final Optional<String> suffixed = Optional.ofNullable(this.name).map(s -> s + suffix);
+        // Creating a new named will re-validate generated name as suffixed string could be too large.
+        return new NamedInternal(suffixed.orElseGet(supplier)).name();
+    }
+
+    String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
+        return orElseGet(() -> provider.newProcessorName(prefix));
+    }
+
+    /**
+     * Returns the internal name or the value returns from the supplier.
+     *
+     * @param supplier  the supplier to be used if internal name is empty.
+     * @return an internal string name.
+     */
+    private String orElseGet(final Supplier<String> supplier) {
+        return Optional.ofNullable(this.name).orElseGet(supplier);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
index fe961ad..546e353 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -27,4 +27,8 @@ public class PrintedInternal<K, V> extends Printed<K, V> {
     public ProcessorSupplier<K, V> build(final String processorName) {
         return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
     }
+
+    public String name() {
+        return processorName;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
index 358982b..0f0620c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 public class ProducedInternal<K, V> extends Produced<K, V> {
+
     public ProducedInternal(final Produced<K, V> produced) {
         super(produced);
     }
@@ -36,4 +37,9 @@ public class ProducedInternal<K, V> extends Produced<K, V> {
     public StreamPartitioner<? super K, ? super V> streamPartitioner() {
         return partitioner;
     }
+
+    public String name() {
+        return processorName;
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index 1540b2d..d24988d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -106,11 +106,12 @@ public class SuppressedInternal<K> implements Suppressed<K>, NamedSuppressed<K>
 
     @Override
     public String toString() {
-        return "SuppressedInternal{name='" + name + '\'' +
-            ", bufferConfig=" + bufferConfig +
-            ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
-            ", timeDefinition=" + timeDefinition +
-            ", safeToDropTombstones=" + safeToDropTombstones +
-            '}';
+        return "SuppressedInternal{" +
+                "name='" + name + '\'' +
+                ", bufferConfig=" + bufferConfig +
+                ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
+                ", timeDefinition=" + timeDefinition +
+                ", safeToDropTombstones=" + safeToDropTombstones +
+                '}';
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index f140546..669cece 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -22,11 +22,15 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -35,11 +39,13 @@ import org.apache.kafka.test.MockPredicate;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -51,7 +57,14 @@ import static org.junit.Assert.assertTrue;
 
 public class StreamsBuilderTest {
 
+    private static final String STREAM_TOPIC     = "stream-topic";
+
+    private static final String STREAM_TOPIC_TWO = "stream-topic-two";
+
+    private static final String TABLE_TOPIC      = "table-topic";
+
     private final StreamsBuilder builder = new StreamsBuilder();
+
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
     @Test
@@ -65,10 +78,10 @@ public class StreamsBuilderTest {
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
         final KTable<Bytes, String> filteredKTable = builder
-            .<Bytes, String>table("table-topic")
+            .<Bytes, String>table(TABLE_TOPIC)
             .filter(MockPredicate.allGoodPredicate());
         builder
-            .<Bytes, String>stream("stream-topic")
+            .<Bytes, String>stream(STREAM_TOPIC)
             .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
@@ -88,10 +101,10 @@ public class StreamsBuilderTest {
     @Test
     public void shouldAllowJoinMaterializedFilteredKTable() {
         final KTable<Bytes, String> filteredKTable = builder
-            .<Bytes, String>table("table-topic")
+            .<Bytes, String>table(TABLE_TOPIC)
             .filter(MockPredicate.allGoodPredicate(), Materialized.as("store"));
         builder
-            .<Bytes, String>stream("stream-topic")
+            .<Bytes, String>stream(STREAM_TOPIC)
             .join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
@@ -112,10 +125,10 @@ public class StreamsBuilderTest {
     @Test
     public void shouldAllowJoinUnmaterializedMapValuedKTable() {
         final KTable<Bytes, String> mappedKTable = builder
-            .<Bytes, String>table("table-topic")
+            .<Bytes, String>table(TABLE_TOPIC)
             .mapValues(MockMapper.noOpValueMapper());
         builder
-            .<Bytes, String>stream("stream-topic")
+            .<Bytes, String>stream(STREAM_TOPIC)
             .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
@@ -135,10 +148,10 @@ public class StreamsBuilderTest {
     @Test
     public void shouldAllowJoinMaterializedMapValuedKTable() {
         final KTable<Bytes, String> mappedKTable = builder
-            .<Bytes, String>table("table-topic")
+            .<Bytes, String>table(TABLE_TOPIC)
             .mapValues(MockMapper.noOpValueMapper(), Materialized.as("store"));
         builder
-            .<Bytes, String>stream("stream-topic")
+            .<Bytes, String>stream(STREAM_TOPIC)
             .join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
@@ -161,7 +174,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> table1 = builder.table("table-topic1");
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
         builder
-            .<Bytes, String>stream("stream-topic")
+            .<Bytes, String>stream(STREAM_TOPIC)
             .join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
@@ -183,7 +196,7 @@ public class StreamsBuilderTest {
         final KTable<Bytes, String> table1 = builder.table("table-topic1");
         final KTable<Bytes, String> table2 = builder.table("table-topic2");
         builder
-            .<Bytes, String>stream("stream-topic")
+            .<Bytes, String>stream(STREAM_TOPIC)
             .join(
                 table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")),
                 MockValueJoiner.TOSTRING_JOINER);
@@ -205,8 +218,8 @@ public class StreamsBuilderTest {
 
     @Test
     public void shouldAllowJoinMaterializedSourceKTable() {
-        final KTable<Bytes, String> table = builder.table("table-topic");
-        builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
+        final KTable<Bytes, String> table = builder.table(TABLE_TOPIC);
+        builder.<Bytes, String>stream(STREAM_TOPIC).join(table, MockValueJoiner.TOSTRING_JOINER);
         builder.build();
 
         final ProcessorTopology topology =
@@ -403,4 +416,72 @@ public class StreamsBuilderTest {
         builder.stream(Arrays.asList(null, null));
         builder.build();
     }
+
+    @Test
+    public void shouldUseSpecifiedNameForStreamSourceProcessor() {
+        final String expected = "source-node";
+        builder.stream(STREAM_TOPIC, Consumed.as(expected));
+        builder.stream(STREAM_TOPIC_TWO);
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, expected, "KSTREAM-SOURCE-0000000000");
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForTableSourceProcessor() {
+        final String expected = "source-node";
+        builder.table(STREAM_TOPIC, Consumed.as(expected));
+        builder.table(STREAM_TOPIC_TWO);
+        builder.build();
+
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertSpecifiedNameForOperation(
+                topology,
+                expected,
+                expected + "-table-source",
+                "KSTREAM-SOURCE-0000000002",
+                "KTABLE-SOURCE-0000000003");
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForGlobalTableSourceProcessor() {
+        final String expected = "source-processor";
+        builder.globalTable(STREAM_TOPIC, Consumed.as(expected));
+        builder.globalTable(STREAM_TOPIC_TWO);
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+
+        assertSpecifiedNameForStateStore(
+                topology.globalStateStores(),
+                "stream-topic-STATE-STORE-0000000000",
+                "stream-topic-two-STATE-STORE-0000000003"
+        );
+    }
+
+    @Test
+    public void shouldUseSpecifiedNameForSinkProcessor() {
+        final String expected = "sink-processor";
+        final KStream<Object, Object> stream = builder.stream(STREAM_TOPIC);
+        stream.to(STREAM_TOPIC_TWO, Produced.as(expected));
+        stream.to(STREAM_TOPIC_TWO);
+        builder.build();
+        final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
+        assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000001");
+    }
+
+    private void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
+        final List<ProcessorNode> processors = topology.processors();
+        Assert.assertEquals("Invalid number of expected processors", expected.length, processors.size());
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], processors.get(i).name());
+        }
+    }
+
+    private void assertSpecifiedNameForStateStore(final List<StateStore> stores, final String... expected) {
+        Assert.assertEquals("Invalid number of expected state stores", expected.length, stores.size());
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], stores.get(i).name());
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java
index de3e503..8434e0f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -32,7 +32,7 @@ public class MaterializedTest {
         Materialized.as("valid_name");
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowInvalidTopicNames() {
         Materialized.as("not:valid");
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/NamedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/NamedTest.java
new file mode 100644
index 0000000..d959ec5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/NamedTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.errors.TopologyException;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.fail;
+
+public class NamedTest {
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowExceptionGivenNullName() {
+        Named.as(null);
+    }
+
+    @Test
+    public void shouldThrowExceptionOnInvalidTopicNames() {
+        final char[] longString = new char[250];
+        Arrays.fill(longString, 'a');
+        final String[] invalidNames = {"", "foo bar", "..", "foo:bar", "foo=bar", ".", new String(longString)};
+
+        for (final String name : invalidNames) {
+            try {
+                Named.validate(name);
+                fail("No exception was thrown for named with invalid name: " + name);
+            } catch (final TopologyException e) {
+                // success
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 9bdea13..acdbb39 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -18,13 +18,13 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedStream;
@@ -85,7 +85,7 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(null);
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotHaveInvalidStoreNameOnReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
     }
@@ -102,7 +102,7 @@ public class KGroupedStreamImplTest {
         groupedStream.windowedBy((Windows) null);
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
         groupedStream
             .windowedBy(TimeWindows.of(ofMillis(10)))
@@ -119,7 +119,7 @@ public class KGroupedStreamImplTest {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotHaveInvalidStoreNameOnAggregate() {
         groupedStream.aggregate(
             MockInitializer.STRING_INIT,
@@ -146,7 +146,7 @@ public class KGroupedStreamImplTest {
         groupedStream.windowedBy((Windows) null);
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
         groupedStream
             .windowedBy(TimeWindows.of(ofMillis(10)))
@@ -284,7 +284,7 @@ public class KGroupedStreamImplTest {
         groupedStream.windowedBy((SessionWindows) null);
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
         groupedStream
             .windowedBy(SessionWindows.with(ofMillis(30)))
@@ -349,7 +349,7 @@ public class KGroupedStreamImplTest {
                 Materialized.with(Serdes.String(), Serdes.String()));
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
         groupedStream
             .windowedBy(SessionWindows.with(ofMillis(10)))
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 09f93e7..34002ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.DoubleSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -24,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedTable;
@@ -64,7 +64,7 @@ public class KGroupedTableImplTest {
             .groupBy(MockMapper.selectValueKeyValueMapper());
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowInvalidStoreNameOnAggregate() {
         groupedTable.aggregate(
             MockInitializer.STRING_INIT,
@@ -116,7 +116,7 @@ public class KGroupedTableImplTest {
             Materialized.as("store"));
     }
 
-    @Test(expected = InvalidTopicException.class)
+    @Test(expected = TopologyException.class)
     public void shouldNotAllowInvalidStoreNameOnReduce() {
         groupedTable.reduce(
             MockReducer.STRING_ADDER,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
new file mode 100644
index 0000000..98b3a4d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class NamedInternalTest {
+
+    private static final String TEST_VALUE  = "default-value";
+    private static final String TEST_SUFFIX = "-suffix";
+
+    @Test
+    public void shouldSuffixNameOrReturnProviderValue() {
+        final String name = "foo";
+        assertEquals(
+                name + TEST_SUFFIX,
+                NamedInternal.with(name).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE)
+        );
+        assertEquals(
+                TEST_VALUE,
+                NamedInternal.with(null).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE)
+        );
+    }
+
+    @Test
+    public void shouldGenerateWithPrefixGivenEmptyName() {
+        final String prefix = "KSTREAM-MAP-";
+        assertEquals(prefix + "PROCESSOR-NAME", NamedInternal.with(null).orElseGenerateWithPrefix(
+                new InternalNameProvider() {
+                    @Override
+                    public String newProcessorName(final String prefix) {
+                        return prefix + "PROCESSOR-NAME";
+                    }
+
+                    @Override
+                    public String newStoreName(final String prefix) {
+                        return null;
+                    }
+                },
+                prefix)
+        );
+    }
+
+    @Test
+    public void shouldNotGenerateWithPrefixGivenValidName() {
+        final String validName = "validName";
+        assertEquals(validName, NamedInternal.with(validName).orElseGenerateWithPrefix(null, "KSTREAM-MAP-")
+        );
+    }
+}
\ No newline at end of file


Mime
View raw message