kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4607: Validate the names of auto-generated internal topics
Date Fri, 17 Mar 2017 19:03:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ec8feb766 -> 9e787716b


KAFKA-4607: Validate the names of auto-generated internal topics

I considered catching errors to add further information about naming internal state stores.
However, Topic.validate() will throw an error that prints the offending name, so I decided
not to add too much complexity.

Author: Nikki Thean <nthean@etsy.com>

Reviewers: Matthias J. Sax, Guozhang Wang, Eno Thereska, Damian Guy, Ismael Juma

Closes #2331 from nixsticks/internal-topics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e787716
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e787716
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e787716

Branch: refs/heads/trunk
Commit: 9e787716b013595851b4a6c1ddf8b8af1ec0f42e
Parents: ec8feb7
Author: Nikki Thean <nthean@etsy.com>
Authored: Fri Mar 17 12:03:02 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Mar 17 12:03:02 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/internals/Topic.java    | 60 ++++++++++++++
 .../kafka/common/internals/TopicTest.java       | 84 ++++++++++++++++++++
 .../kafka/streams/kstream/KGroupedStream.java   | 43 +++++++---
 .../kafka/streams/kstream/KGroupedTable.java    | 15 +++-
 .../apache/kafka/streams/kstream/KTable.java    |  4 +-
 .../kstream/internals/AbstractStream.java       |  3 +
 .../kstream/internals/KGroupedStreamImpl.java   |  4 +
 .../kstream/internals/KGroupedTableImpl.java    |  7 ++
 .../streams/processor/StateStoreSupplier.java   |  1 +
 .../internals/InternalTopicConfig.java          |  4 +
 .../internals/KGroupedStreamImplTest.java       | 46 ++++++++++-
 .../internals/KGroupedTableImplTest.java        | 12 +++
 .../internals/InternalTopicConfigTest.java      |  6 ++
 13 files changed, 274 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
new file mode 100644
index 0000000..b9971d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Topic {
+
+    private static final String INVALID_CHARS = "[^a-zA-Z0-9._\\-]";
+    private static final int MAX_NAME_LENGTH = 249;
+    private static final Pattern INVALID_CHARS_PATTERN = Pattern.compile(INVALID_CHARS);
+    private static final Pattern ONLY_PERIODS_PATTERN = Pattern.compile("^[.]+$");
+
+    public static void validate(String topic) {
+        if (isEmpty(topic))
+            throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name is
illegal, can't be empty");
+        else if (containsOnlyPeriods(topic))
+            throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name cannot
be \".\" or \"..\"");
+        else if (exceedsMaxLength(topic))
+            throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name is
illegal, can't be longer than " + MAX_NAME_LENGTH + " characters");
+        else if (containsInvalidCharacters(topic)) throw new org.apache.kafka.common.errors.InvalidTopicException("Topic
name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_'
and '-'");
+    }
+
+    static boolean isEmpty(String topic) {
+        return topic.isEmpty();
+    }
+
+    static boolean containsOnlyPeriods(String topic) {
+        Matcher matcher = ONLY_PERIODS_PATTERN.matcher(topic);
+        return matcher.find();
+    }
+
+    static boolean exceedsMaxLength(String topic) {
+        return topic.length() > MAX_NAME_LENGTH;
+    }
+
+    /**
+     * Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-'
+     */
+    static boolean containsInvalidCharacters(String topic) {
+        Matcher matcher = INVALID_CHARS_PATTERN.matcher(topic);
+        return matcher.find();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
new file mode 100644
index 0000000..36ff521
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertTrue;
+
+public class TopicTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void shouldRecognizeValidTopicNames() {
+        String[] validTopicNames = {"valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_."};
+
+        for (String topicName : validTopicNames) {
+            Topic.validate(topicName);
+        }
+    }
+
+    @Test
+    public void shouldThrowOnInvalidTopicNames() {
+        String[] invalidTopicNames = {"", "foo bar", "..", "foo:bar", "foo=bar"};
+
+        for (String topicName : invalidTopicNames) {
+            thrown.expect(InvalidTopicException.class);
+            Topic.validate(topicName);
+        }
+    }
+
+    @Test
+    public void shouldRecognizeEmptyTopicNames() {
+        assertTrue(Topic.isEmpty(""));
+    }
+
+    @Test
+    public void shouldRecognizeTopicNamesThatExceedMaxLength() {
+        String longName = "ATCG";
+
+        for (int i = 0; i < 6; i++) {
+            longName += longName;
+        }
+
+        assertTrue(Topic.exceedsMaxLength(longName));
+    }
+
+    @Test
+    public void shouldRecognizeInvalidCharactersInTopicNames() {
+        Character[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?',
' ', '\t', '\r', '\n', '='};
+
+        for (Character c : invalidChars) {
+            String topicName = "Is " + c + "illegal";
+            assertTrue(Topic.containsInvalidCharacters(topicName));
+        }
+    }
+
+    @Test
+    public void shouldRecognizeTopicNamesThatContainOnlyPeriods() {
+        String[] invalidTopicNames = {".", "..", "...."};
+
+        for (String topicName : invalidTopicNames) {
+            assertTrue(Topic.containsOnlyPeriods(topicName));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 01e4df9..c961c7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -71,13 +71,16 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
      * provide {@code storeName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param storeName the name of the underlying {@link KTable} state store
+     * @param storeName the name of the underlying {@link KTable} state store; valid characters
are ASCII
+     *                  alphanumerics, '.', '_' and '-'
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
@@ -148,6 +151,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
@@ -155,7 +160,8 @@ public interface KGroupedStream<K, V> {
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param windows   the specification of the aggregation {@link Windows}
-     * @param storeName the name of the underlying {@link KTable} state store
+     * @param storeName the name of the underlying {@link KTable} state store; valid characters
are ASCII
+     *                  alphanumerics, '.', '_' and '-'
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys
and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within
a window
      */
@@ -235,7 +241,8 @@ public interface KGroupedStream<K, V> {
      *
      *
      * @param sessionWindows the specification of the aggregation {@link SessionWindows}
-     * @param storeName      the name of the state store created from this operation.
+     * @param storeName      the name of the state store created from this operation; valid
characters are ASCII
+     *                       alphanumerics, '.', '_' and '-
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys
and {@link Long} values
      * that represent the latest (rolling) count (i.e., number of records) for each key within
a window
      */
@@ -313,6 +320,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
@@ -320,7 +329,8 @@ public interface KGroupedStream<K, V> {
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @param storeName the name of the underlying {@link KTable} state store
+     * @param storeName the name of the underlying {@link KTable} state store; valid characters
are ASCII
+     *                  alphanumerics, '.', '_' and '-'
      * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
      * latest (rolling) aggregate for each key
      */
@@ -411,6 +421,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
@@ -419,7 +431,8 @@ public interface KGroupedStream<K, V> {
      *
      * @param reducer   a {@link Reducer} that computes a new aggregate result
      * @param windows   the specification of the aggregation {@link Windows}
-     * @param storeName the name of the state store created from this operation
+     * @param storeName the name of the state store created from this operation; valid characters
are ASCII
+     *                  alphanumerics, '.', '_' and '-'
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -516,6 +529,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
@@ -523,7 +538,8 @@ public interface KGroupedStream<K, V> {
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      * @param reducer           the instance of {@link Reducer}
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
-     * @param storeName         the name of the state store created from this operation
+     * @param storeName         the name of the state store created from this operation;
valid characters are ASCII
+     *                          alphanumerics, '.', '_' and '-'
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -570,6 +586,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
@@ -622,6 +640,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
@@ -632,7 +652,8 @@ public interface KGroupedStream<K, V> {
      * @param aggregator    an {@link Aggregator} that computes a new aggregate result
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will
be used
-     * @param storeName     the name of the state store created from this operation
+     * @param storeName     the name of the state store created from this operation; valid
characters are ASCII
+     *                      alphanumerics, '.', '_' and '-'
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
      * latest (rolling) aggregate for each key
@@ -732,6 +753,8 @@ public interface KGroupedStream<K, V> {
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters
other than ASCII
+     * alphanumerics, '.', '_' and '-'.
      * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
@@ -745,7 +768,8 @@ public interface KGroupedStream<K, V> {
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will
be used
      * @param <VR>          the value type of the resulting {@link KTable}
-     * @param storeName     the name of the state store created from this operation
+     * @param storeName     the name of the state store created from this operation; valid
characters are ASCII
+     *                      alphanumerics, '.', '_' and '-'
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
      */
@@ -857,7 +881,8 @@ public interface KGroupedStream<K, V> {
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will
be used
      * @param <T>           the value type of the resulting {@link KTable}
-     * @param storeName     the name of the state store created from this operation
+     * @param storeName     the name of the state store created from this operation; valid
characters are ASCII
+     *                      alphanumerics, '.', '_' and '-'
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys,
and values that represent
      * the latest (rolling) aggregate for each key within a window
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 9201813..8685e8b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -72,9 +72,12 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
      * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * The store name must be a valid Kafka topic name and cannot contain characters other
than ASCII alphanumerics,
+     * '.', '_' and '-'.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param storeName the name of the underlying {@link KTable} state store
+     * @param storeName     the name of the underlying {@link KTable} state store; valid
characters are ASCII
+     *                      alphanumerics, '.', '_' and '-'
      * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
      * represent the latest (rolling) count (i.e., number of records) for each key
      */
@@ -176,11 +179,14 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
      * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * The store name must be a valid Kafka topic name and cannot contain characters other
than ASCII alphanumerics,
+     * '.', '_' and '-'.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param adder      a {@link Reducer} that adds a new value to the aggregate result
      * @param subtractor a {@link Reducer} that removed an old value from the aggregate result
-     * @param storeName  the name of the underlying {@link KTable} state store
+     * @param storeName     the name of the underlying {@link KTable} state store; valid
characters are ASCII alphanumerics,
+     *                      '.', '_' and '-'
      * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
      * latest (rolling) aggregate for each key
      */
@@ -404,6 +410,8 @@ public interface KGroupedTable<K, V> {
      * user-specified in {@link StreamsConfig} via parameter
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
      * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * The store name must be a valid Kafka topic name and cannot contain characters other
than ASCII alphanumerics,
+     * '.', '_' and '-'.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
      * @param initializer   a {@link Initializer} that provides an initial aggregate result
value
@@ -411,7 +419,8 @@ public interface KGroupedTable<K, V> {
      * @param subtractor    a {@link Aggregator} that removed an old record from the aggregate
result
      * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
      *                      if not specified the default serdes defined in the configs will
be used
-     * @param storeName     the name of the underlying {@link KTable} state store
+     * @param storeName     the name of the underlying {@link KTable} state store; valid
characters are ASCII
+     *                      alphanumerics, '.', '_' and '-'
      * @param <VR>          the value type of the aggregated {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
      * latest (rolling) aggregate for each key

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 807a11c..de7c153 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -356,9 +356,11 @@ public interface KTable<K, V> {
      * <p>
      * The resulting {@code KTable} will be materialized in a local state store with the
given store name (cf.
      * {@link KStreamBuilder#table(String, String)})
+     * The store name must be a valid Kafka topic name and cannot contain characters other
than ASCII alphanumerics, '.', '_' and '-'.
      *
      * @param topic     the topic name
-     * @param storeName the state store name used for the result {@code KTable}
+     * @param storeName the state store name used for the result {@code KTable}; valid characters
are ASCII
+     *                  alphanumerics, '.', '_' and '-'
      * @return a {@code KTable} that contains the exact same (and potentially repartitioned)
records as this {@code KTable}
      */
     KTable<K, V> through(final String topic,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 3ff7e6a..dce5d12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -70,6 +71,7 @@ public abstract class AbstractStream<K> {
                                                                    final Serde<T> aggValueSerde,
                                                                    final String storeName)
{
         Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return storeFactory(keySerde, aggValueSerde, storeName).build();
     }
 
@@ -79,6 +81,7 @@ public abstract class AbstractStream<K> {
                                                                                    final
Windows<W> windows,
                                                                                    final
String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return storeFactory(keySerde, aggValSerde, storeName)
                 .windowed(windows.size(), windows.maintainMs(), windows.segments, false)
                 .build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 1b042d5..cc6a126 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -202,6 +203,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
                                                 final Serde<T> aggValueSerde,
                                                 final String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return aggregate(initializer,
                          aggregator,
                          sessionMerger,
@@ -237,6 +239,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
     @SuppressWarnings("unchecked")
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
final String storeName) {
         Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return count(sessionWindows,
                      storeFactory(keySerde, Serdes.Long(), storeName)
                              .sessionWindowed(sessionWindows.maintainMs()).build());
@@ -278,6 +281,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements
KGroupedStre
                                          final String storeName) {
 
         Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return reduce(reducer, sessionWindows,
                       storeFactory(keySerde, valSerde, storeName)
                               .sessionWindowed(sessionWindows.maintainMs()).build());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index afbdbf9..08a4c5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -72,6 +73,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K>
implements KGroup
                                       Aggregator<? super K, ? super V, T> adder,
                                       Aggregator<? super K, ? super V, T> subtractor,
                                       String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return aggregate(initializer, adder, subtractor, null, storeName);
     }
 
@@ -124,6 +127,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K>
implements KGroup
     public KTable<K, V> reduce(Reducer<V> adder,
                                Reducer<V> subtractor,
                                String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return reduce(adder, subtractor, keyValueStore(keySerde, valSerde, storeName));
     }
 
@@ -140,6 +145,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K>
implements KGroup
 
     @Override
     public KTable<K, Long> count(String storeName) {
+        Objects.requireNonNull(storeName, "storeName can't be null");
+        Topic.validate(storeName);
         return count(keyValueStore(keySerde, Serdes.Long(), storeName));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index b0ad256..173dab9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -27,6 +27,7 @@ public interface StateStoreSupplier<T extends StateStore> {
 
     /**
      * Return the name of this state store supplier.
+     * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.',
'_' and '-'
      *
      * @return the name of this state store supplier
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index 522e9f8..1c8ca6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.internals.Topic;
+
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
@@ -36,6 +38,8 @@ public class InternalTopicConfig {
 
     public InternalTopicConfig(final String name, final Set<CleanupPolicy> defaultCleanupPolicies,
final Map<String, String> logConfig) {
         Objects.requireNonNull(name, "name can't be null");
+        Topic.validate(name);
+
         if (defaultCleanupPolicies.isEmpty()) {
             throw new IllegalArgumentException("Must provide at least one cleanup policy");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index f4cefe4..8d1a789 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -55,6 +56,7 @@ import static org.junit.Assert.assertEquals;
 public class KGroupedStreamImplTest {
 
     private static final String TOPIC = "topic";
+    private static final String INVALID_STORE_NAME = "~foo bar~";
     private final KStreamBuilder builder = new KStreamBuilder();
     private KGroupedStream<String, String> groupedStream;
     private KStreamTestDriver driver = null;
@@ -83,6 +85,11 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, (String) null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotHaveInvalidStoreNameOnReduce() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
         groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier<KeyValueStore>)
null);
@@ -103,6 +110,11 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), (String) null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() throws Exception {
+        groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
@@ -118,6 +130,11 @@ public class KGroupedStreamImplTest {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
Serdes.String(), null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotHaveInvalidStoreNameOnAggregate() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
Serdes.String(), INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10),
Serdes.String(), "store");
@@ -138,6 +155,11 @@ public class KGroupedStreamImplTest {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10), Serdes.String(), null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() throws Exception {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10), Serdes.String(), INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
TimeWindows.of(10), null);
@@ -268,8 +290,13 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (String)
null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() throws Exception
{
+        groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStateStoreSupplierNameWhenReducingSessionWindows() throws
Exception {
+    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() throws
Exception {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier<SessionStore>)
null);
     }
 
@@ -323,6 +350,16 @@ public class KGroupedStreamImplTest {
         }, SessionWindows.with(10), Serdes.String(), (String) null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() throws Exception
{
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
new Merger<String, String>() {
+            @Override
+            public String apply(final String aggKey, final String aggOne, final String aggTwo)
{
+                return null;
+            }
+        }, SessionWindows.with(10), Serdes.String(), INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows()
throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
new Merger<String, String>() {
@@ -343,8 +380,13 @@ public class KGroupedStreamImplTest {
         groupedStream.count(SessionWindows.with(90), (String) null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() throws Exception
{
+        groupedStream.count(SessionWindows.with(90), INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStoreStoreSupplierNameWhenCountingSessionWindows() throws
Exception {
+    public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() throws
Exception {
         groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>)
null);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 8888bfb..4934204 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -42,6 +43,7 @@ import static org.junit.Assert.assertEquals;
 public class KGroupedTableImplTest {
 
     private final KStreamBuilder builder = new KStreamBuilder();
+    private static final String INVALID_STORE_NAME = "~foo bar~";
     private KGroupedTable<String, String> groupedTable;
     private KStreamTestDriver driver = null;
 
@@ -64,6 +66,11 @@ public class KGroupedTableImplTest {
         groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER, (String) null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotAllowInvalidStoreNameOnAggregate() throws Exception {
+        groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER,
MockAggregator.TOSTRING_REMOVER, INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullInitializerOnAggregate() throws Exception {
         groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER,
"store");
@@ -94,6 +101,11 @@ public class KGroupedTableImplTest {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (String)
null);
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldNotAllowInvalidStoreNameOnReduce() throws Exception {
+        groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, INVALID_STORE_NAME);
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullStoreSupplierOnReduce() throws Exception {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (String)
null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e787716/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
index 9c3ef31..55534b7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
@@ -45,6 +46,11 @@ public class InternalTopicConfigTest {
         new InternalTopicConfig(null, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap());
     }
 
+    @Test(expected = InvalidTopicException.class)
+    public void shouldThrowIfNameIsInvalid() throws Exception {
+        new InternalTopicConfig("foo bar baz", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap());
+    }
+
     @Test
     public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() throws
Exception {
         final InternalTopicConfig topicConfig = new InternalTopicConfig("name",


Mime
View raw message