kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: MINOR: Consolidate Topic classes
Date Mon, 15 May 2017 08:10:21 GMT
MINOR: Consolidate Topic classes

During the 0.11.0.0 cycle, a Java version of the class
was introduced so that Streams could use it. Given that
it includes the bulk of the functionality of the Scala
version of the class, it makes sense to consolidate them.

While doing this, I noticed that one of the tests for
the Java class (`shouldThrowOnInvalidTopicNames`) was
broken as it only checked if the first topic name in
the list was invalid.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3046 from ijuma/consolidate-topic-classes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc55f852
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc55f852
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc55f852

Branch: refs/heads/trunk
Commit: bc55f85237cb46e73c6774298cf308060a4a739c
Parents: 885643c
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon May 15 09:10:09 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon May 15 09:10:09 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/common/internals/Topic.java    |  66 ++++++++----
 .../kafka/common/internals/TopicTest.java       |  89 +++++++++-------
 .../src/main/scala/kafka/admin/AdminUtils.scala |   3 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |   9 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   6 +-
 core/src/main/scala/kafka/common/Topic.scala    |  74 --------------
 .../main/scala/kafka/consumer/TopicFilter.scala |   5 +-
 .../coordinator/group/GroupCoordinator.scala    |   5 +-
 .../group/GroupMetadataManager.scala            |  16 +--
 .../transaction/TransactionStateManager.scala   |  11 +-
 core/src/main/scala/kafka/log/Log.scala         |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  22 ++--
 .../main/scala/kafka/server/MetadataCache.scala |   3 +-
 .../scala/kafka/server/ReplicaManager.scala     |   3 +-
 .../kafka/tools/StateChangeLogMerger.scala      |   5 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  18 ++--
 .../kafka/api/BaseConsumerTest.scala            |   4 +-
 .../api/GroupCoordinatorIntegrationTest.scala   |   6 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |   4 +-
 ...tenersWithSameSecurityProtocolBaseTest.scala |   4 +-
 .../unit/kafka/admin/TopicCommandTest.scala     |  12 +--
 .../scala/unit/kafka/common/TopicTest.scala     | 102 -------------------
 .../unit/kafka/consumer/TopicFilterTest.scala   |  10 +-
 .../group/GroupCoordinatorResponseTest.scala    |  31 +++---
 .../group/GroupMetadataManagerTest.scala        |  42 ++++----
 .../TransactionCoordinatorIntegrationTest.scala |   4 +-
 .../TransactionStateManagerTest.scala           |   9 +-
 .../unit/kafka/server/MetadataRequestTest.scala |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   5 +-
 29 files changed, 230 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index b9971d0..dd6dbcf 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -16,45 +16,67 @@
  */
 package org.apache.kafka.common.internals;
 
-import java.util.regex.Matcher;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Collections;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 public class Topic {
 
-    private static final String INVALID_CHARS = "[^a-zA-Z0-9._\\-]";
+    public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
+    public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
+    public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
+
+    private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
+            Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME));
+
     private static final int MAX_NAME_LENGTH = 249;
-    private static final Pattern INVALID_CHARS_PATTERN = Pattern.compile(INVALID_CHARS);
-    private static final Pattern ONLY_PERIODS_PATTERN = Pattern.compile("^[.]+$");
+    private static final Pattern LEGAL_CHARS_PATTERN = Pattern.compile(LEGAL_CHARS + "+");
 
     public static void validate(String topic) {
-        if (isEmpty(topic))
-            throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name is illegal, can't be empty");
-        else if (containsOnlyPeriods(topic))
-            throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name cannot be \".\" or \"..\"");
-        else if (exceedsMaxLength(topic))
-            throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name is illegal, can't be longer than " + MAX_NAME_LENGTH + " characters");
-        else if (containsInvalidCharacters(topic)) throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'");
+        if (topic.isEmpty())
+            throw new InvalidTopicException("Topic name is illegal, it can't be empty");
+        if (topic.equals(".") || topic.equals(".."))
+            throw new InvalidTopicException("Topic name cannot be \".\" or \"..\"");
+        if (topic.length() > MAX_NAME_LENGTH)
+            throw new InvalidTopicException("Topic name is illegal, it can't be longer than " + MAX_NAME_LENGTH +
+                    " characters, topic name: " + topic);
+        if (!containsValidPattern(topic))
+            throw new InvalidTopicException("Topic name \"" + topic + "\" is illegal, it contains a character other than " +
+                    "ASCII alphanumerics, '.', '_' and '-'");
     }
 
-    static boolean isEmpty(String topic) {
-        return topic.isEmpty();
+    public static boolean isInternal(String topic) {
+        return INTERNAL_TOPICS.contains(topic);
     }
 
-    static boolean containsOnlyPeriods(String topic) {
-        Matcher matcher = ONLY_PERIODS_PATTERN.matcher(topic);
-        return matcher.find();
+    /**
+     * Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide.
+     *
+     * @param topic The topic to check for colliding character
+     * @return true if the topic has collision characters
+     */
+    public static boolean hasCollisionChars(String topic) {
+        return topic.contains("_") || topic.contains(".");
     }
 
-    static boolean exceedsMaxLength(String topic) {
-        return topic.length() > MAX_NAME_LENGTH;
+    /**
+     * Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position.
+     *
+     * @param topicA A topic to check for collision
+     * @param topicB A topic to check for collision
+     * @return true if the topics collide
+     */
+    public static boolean hasCollision(String topicA, String topicB) {
+        return topicA.replace('.', '_').equals(topicB.replace('.', '_'));
     }
 
     /**
      * Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-'
      */
-    static boolean containsInvalidCharacters(String topic) {
-        Matcher matcher = INVALID_CHARS_PATTERN.matcher(topic);
-        return matcher.find();
+    static boolean containsValidPattern(String topic) {
+        return LEGAL_CHARS_PATTERN.matcher(topic).matches();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
index 36ff521..f7475c4 100644
--- a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
@@ -17,20 +17,23 @@
 package org.apache.kafka.common.internals;
 
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.junit.Rule;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TopicTest {
 
-    @Rule
-    public ExpectedException thrown = ExpectedException.none();
-
     @Test
-    public void shouldRecognizeValidTopicNames() {
-        String[] validTopicNames = {"valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_."};
+    public void shouldAcceptValidTopicNames() {
+        String maxLengthString = TestUtils.randomString(249);
+        String[] validTopicNames = {"valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_.", "...", maxLengthString};
 
         for (String topicName : validTopicNames) {
             Topic.validate(topicName);
@@ -39,46 +42,64 @@ public class TopicTest {
 
     @Test
     public void shouldThrowOnInvalidTopicNames() {
-        String[] invalidTopicNames = {"", "foo bar", "..", "foo:bar", "foo=bar"};
+        char[] longString = new char[250];
+        Arrays.fill(longString, 'a');
+        String[] invalidTopicNames = {"", "foo bar", "..", "foo:bar", "foo=bar", ".", new String(longString)};
 
         for (String topicName : invalidTopicNames) {
-            thrown.expect(InvalidTopicException.class);
-            Topic.validate(topicName);
+            try {
+                Topic.validate(topicName);
+                fail("No exception was thrown for topic with invalid name: " + topicName);
+            } catch (InvalidTopicException e) {
+                // Good
+            }
         }
     }
 
     @Test
-    public void shouldRecognizeEmptyTopicNames() {
-        assertTrue(Topic.isEmpty(""));
-    }
-
-    @Test
-    public void shouldRecognizeTopicNamesThatExceedMaxLength() {
-        String longName = "ATCG";
+    public void shouldRecognizeInvalidCharactersInTopicNames() {
+        char[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='};
 
-        for (int i = 0; i < 6; i++) {
-            longName += longName;
+        for (char c : invalidChars) {
+            String topicName = "Is " + c + "illegal";
+            assertFalse(Topic.containsValidPattern(topicName));
         }
-
-        assertTrue(Topic.exceedsMaxLength(longName));
     }
 
     @Test
-    public void shouldRecognizeInvalidCharactersInTopicNames() {
-        Character[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='};
-
-        for (Character c : invalidChars) {
-            String topicName = "Is " + c + "illegal";
-            assertTrue(Topic.containsInvalidCharacters(topicName));
-        }
+    public void testTopicHasCollisionChars() {
+        List<String> falseTopics = Arrays.asList("start", "end", "middle", "many");
+        List<String> trueTopics = Arrays.asList(
+                ".start", "end.", "mid.dle", ".ma.ny.",
+                "_start", "end_", "mid_dle", "_ma_ny."
+        );
+
+        for (String topic : falseTopics)
+            assertFalse(Topic.hasCollisionChars(topic));
+
+        for (String topic : trueTopics)
+            assertTrue(Topic.hasCollisionChars(topic));
     }
 
     @Test
-    public void shouldRecognizeTopicNamesThatContainOnlyPeriods() {
-        String[] invalidTopicNames = {".", "..", "...."};
+    public void testTopicHasCollision() {
+        List<String> periodFirstMiddleLastNone = Arrays.asList(".topic", "to.pic", "topic.", "topic");
+        List<String> underscoreFirstMiddleLastNone = Arrays.asList("_topic", "to_pic", "topic_", "topic");
 
-        for (String topicName : invalidTopicNames) {
-            assertTrue(Topic.containsOnlyPeriods(topicName));
-        }
+        // Self
+        for (String topic : periodFirstMiddleLastNone)
+            assertTrue(Topic.hasCollision(topic, topic));
+
+        for (String topic : underscoreFirstMiddleLastNone)
+            assertTrue(Topic.hasCollision(topic, topic));
+
+        // Same Position
+        for (int i = 0; i < periodFirstMiddleLastNone.size(); ++i)
+            assertTrue(Topic.hasCollision(periodFirstMiddleLastNone.get(i), underscoreFirstMiddleLastNone.get(i)));
+
+        // Different Position
+        Collections.reverse(underscoreFirstMiddleLastNone);
+        for (int i = 0; i < periodFirstMiddleLastNone.size(); ++i)
+            assertFalse(Topic.hasCollision(periodFirstMiddleLastNone.get(i), underscoreFirstMiddleLastNone.get(i)));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index ca81af3..49d249b 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -17,7 +17,6 @@
 
 package kafka.admin
 
-import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
@@ -26,6 +25,7 @@ import kafka.utils.ZkUtils._
 import java.util.Random
 import java.util.Properties
 
+import kafka.common.TopicAlreadyMarkedForDeletionException
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, LeaderNotAvailableException, ReplicaNotAvailableException, TopicExistsException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.network.ListenerName
@@ -39,6 +39,7 @@ import scala.collection.mutable
 import collection.Map
 import collection.Set
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import org.apache.kafka.common.internals.Topic
 
 trait AdminUtilities {
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index caad62a..dd7a477 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -20,18 +20,17 @@ package kafka.admin
 import java.util.Properties
 
 import joptsimple.{OptionParser, OptionSpec}
-
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.client.ClientUtils
-import kafka.common.{TopicAndPartition, _}
+import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
-
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.common.Node
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.BrokerNotAvailableException
+import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.security.JaasUtils

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 57dfc5a..942d70e 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -18,8 +18,9 @@
 package kafka.admin
 
 import java.util.Properties
+
 import joptsimple._
-import kafka.common.{AdminCommandFailedException, Topic}
+import kafka.common.AdminCommandFailedException
 import kafka.consumer.Whitelist
 import kafka.log.LogConfig
 import kafka.server.ConfigType
@@ -27,6 +28,7 @@ import kafka.utils.ZkUtils._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
 
@@ -135,7 +137,7 @@ object TopicCommand extends Logging {
       }
 
       if(opts.options.has(opts.partitionsOpt)) {
-        if (topic == Topic.GroupMetadataTopicName) {
+        if (topic == Topic.GROUP_METADATA_TOPIC_NAME) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
         }
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
deleted file mode 100644
index 6ca7175..0000000
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-import util.matching.Regex
-
-import scala.collection.immutable
-
-object Topic {
-
-  val GroupMetadataTopicName = "__consumer_offsets"
-  val TransactionStateTopicName = "__transaction_state"
-  val InternalTopics = immutable.Set(GroupMetadataTopicName, TransactionStateTopicName)
-
-  val legalChars = "[a-zA-Z0-9\\._\\-]"
-  private val maxNameLength = 249
-  private val rgx = new Regex(legalChars + "+")
-
-  def validate(topic: String) {
-    if (topic.length <= 0)
-      throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty")
-    else if (topic.equals(".") || topic.equals(".."))
-      throw new org.apache.kafka.common.errors.InvalidTopicException("topic name cannot be \".\" or \"..\"")
-    else if (topic.length > maxNameLength)
-      throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters")
-
-    rgx.findFirstIn(topic) match {
-      case Some(t) =>
-        if (!t.equals(topic))
-          throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'")
-      case None => throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal,  contains a character other than ASCII alphanumerics, '.', '_' and '-'")
-    }
-  }
-
-  /**
-   * Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide.
-   *
-   * @param topic The topic to check for colliding character
-   * @return true if the topic has collision characters
-   */
-  def hasCollisionChars(topic: String): Boolean = {
-    topic.contains("_") || topic.contains(".")
-  }
-
-  /**
-   * Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position.
-   *
-   * @param topicA A topic to check for collision
-   * @param topicB A topic to check for collision
-   * @return true if the topics collide
-   */
-  def hasCollision(topicA: String, topicB: String): Boolean = {
-    topicA.replace('.', '_') == topicB.replace('.', '_')
-  }
-
-  def isInternal(topic: String): Boolean =
-    InternalTopics.contains(topic)
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 914e9b9..69d7455 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -18,8 +18,9 @@
 package kafka.consumer
 
 import kafka.utils.Logging
-import java.util.regex.{PatternSyntaxException, Pattern}
-import kafka.common.Topic
+import java.util.regex.{Pattern, PatternSyntaxException}
+
+import org.apache.kafka.common.internals.Topic
 
 sealed abstract class TopicFilter(rawRegex: String) extends Logging {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index a57b6be..031a9c1 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -19,12 +19,13 @@ package kafka.coordinator.group
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.common.{OffsetAndMetadata, Topic}
+import kafka.common.OffsetAndMetadata
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse, TransactionResult}
@@ -428,7 +429,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleTxnCompletion(producerId: Long,
                           topicPartitions: Seq[TopicPartition],
                           transactionResult: TransactionResult) {
-    val offsetPartitions = topicPartitions.filter(_.topic() == Topic.GroupMetadataTopicName).map(_.partition).toSet
+    val offsetPartitions = topicPartitions.filter(_.topic == Topic.GROUP_METADATA_TOPIC_NAME).map(_.partition).toSet
     groupManager.handleTxnCompletion(producerId, offsetPartitions, transactionResult == TransactionResult.COMMIT)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 222bcdc..74a3f7b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -27,13 +27,13 @@ import java.util.concurrent.locks.ReentrantLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
 import kafka.common.{MessageFormatter, _}
-import kafka.coordinator.group._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
@@ -171,7 +171,7 @@ class GroupMetadataManager(brokerId: Int,
           builder.build()
         }
 
-        val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+        val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
         val groupMetadataRecords = Map(groupMetadataPartition -> records)
         val generationId = group.generationId
 
@@ -284,7 +284,7 @@ class GroupMetadataManager(brokerId: Int,
             val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
             new SimpleRecord(timestamp, key, value)
           }
-          val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+          val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
           val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
 
           if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
@@ -431,7 +431,7 @@ class GroupMetadataManager(brokerId: Int,
    * Asynchronously read the partition from the offsets topic and populate the cache
    */
   def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
-    val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+    val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
 
     def doLoadGroupsAndOffsets() {
       info(s"Loading offsets and group metadata from $topicPartition")
@@ -632,7 +632,7 @@ class GroupMetadataManager(brokerId: Int,
    */
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
-    val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+    val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
     scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets _)
 
     def removeGroupsAndOffsets() {
@@ -687,7 +687,7 @@ class GroupMetadataManager(brokerId: Int,
       }
 
       val offsetsPartition = partitionFor(groupId)
-      val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+      val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
       getMagic(offsetsPartition) match {
         case Some(magicValue) =>
           // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
@@ -802,7 +802,7 @@ class GroupMetadataManager(brokerId: Int,
    * If the topic does not exist, the configured partition count is returned.
    */
   private def getGroupMetadataTopicPartitionCount: Int = {
-    zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions)
+    zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
   }
 
   /**
@@ -812,7 +812,7 @@ class GroupMetadataManager(brokerId: Int,
    * @return  Some(MessageFormatVersion) if replica is local, None otherwise
    */
   private def getMagic(partition: Int): Option[Byte] =
-    replicaManager.getMagic(new TopicPartition(Topic.GroupMetadataTopicName, partition))
+    replicaManager.getMagic(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))
 
   /**
    * Add the partition into the owned list

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 7a03fc3..1106e7c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,13 +22,14 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 
-import kafka.common.{KafkaException, Topic}
+import kafka.common.KafkaException
 import kafka.log.LogConfig
 import kafka.message.UncompressedCodec
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.IsolationLevel
@@ -182,7 +183,7 @@ class TransactionStateManager(brokerId: Int,
    * If the topic does not exist, the default partition count is returned.
    */
   private def getTransactionTopicPartitionCount: Int = {
-    zkUtils.getTopicPartitionCount(Topic.TransactionStateTopicName).getOrElse(config.transactionLogNumPartitions)
+    zkUtils.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions)
   }
 
   private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): Pool[String, TransactionMetadata] =  {
@@ -274,7 +275,7 @@ class TransactionStateManager(brokerId: Int,
   def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback) {
     validateTransactionTopicPartitionCountIsStable()
 
-    val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partitionId)
+    val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
 
     inLock(stateLock) {
       loadingPartitions.add(partitionId)
@@ -320,7 +321,7 @@ class TransactionStateManager(brokerId: Int,
   def removeTransactionsForTxnTopicPartition(partitionId: Int) {
     validateTransactionTopicPartitionCountIsStable()
 
-    val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partitionId)
+    val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
 
     def removeTransactions() {
       inLock(stateLock) {
@@ -359,7 +360,7 @@ class TransactionStateManager(brokerId: Int,
 
     val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, new SimpleRecord(timestamp, keyBytes, valueBytes))
 
-    val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partitionFor(transactionalId))
+    val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId))
     val recordsPerPartition = Map(topicPartition -> records)
 
     // set the callback function to update transaction status in cache after log append completed

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ed4eff2..7203033 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -45,6 +45,8 @@ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import java.util.Map.{Entry => JEntry}
 import java.lang.{Long => JLong}
 
+import org.apache.kafka.common.internals.Topic
+
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
     NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
@@ -778,7 +780,7 @@ class Log(@volatile var dir: File,
                               loadingFromLog: Boolean): Unit = {
     val pid = batch.producerId
     val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog))
-    val shouldValidateSequenceNumbers = topicPartition.topic() != Topic.GroupMetadataTopicName
+    val shouldValidateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
     val maybeCompletedTxn = appendInfo.append(batch, shouldValidateSequenceNumbers)
     maybeCompletedTxn.foreach(completedTxns += _)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1f8bea5..5f1a2d5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -28,7 +28,6 @@ import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse}
 import kafka.cluster.Partition
 import kafka.common.{KafkaStorageException, OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
-import kafka.common.Topic.{GroupMetadataTopicName, TransactionStateTopicName, isInternal}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
@@ -40,6 +39,7 @@ import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe,
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
@@ -139,16 +139,16 @@ class KafkaApis(val requestChannel: RequestChannel,
         // this callback is invoked under the replica state change lock to ensure proper order of
         // leadership changes
         updatedLeaders.foreach { partition =>
-          if (partition.topic == GroupMetadataTopicName)
+          if (partition.topic == GROUP_METADATA_TOPIC_NAME)
             groupCoordinator.handleGroupImmigration(partition.partitionId)
-          else if (partition.topic == TransactionStateTopicName)
+          else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
             txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch)
         }
 
         updatedFollowers.foreach { partition =>
-          if (partition.topic == GroupMetadataTopicName)
+          if (partition.topic == GROUP_METADATA_TOPIC_NAME)
             groupCoordinator.handleGroupEmigration(partition.partitionId)
-          else if (partition.topic == TransactionStateTopicName)
+          else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
             txnCoordinator.handleTxnEmigration(partition.partitionId)
         }
       }
@@ -185,7 +185,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
       // is not cleared.
       result.foreach { case (topicPartition, error) =>
-        if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) {
+        if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) {
           groupCoordinator.handleGroupEmigration(topicPartition.partition)
         }
       }
@@ -816,7 +816,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val aliveBrokers = metadataCache.getAliveBrokers
 
     topic match {
-      case GroupMetadataTopicName =>
+      case GROUP_METADATA_TOPIC_NAME =>
         if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
           error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
             s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
@@ -827,7 +827,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
             groupCoordinator.offsetsTopicConfigs)
         }
-      case TransactionStateTopicName =>
+      case TRANSACTION_STATE_TOPIC_NAME =>
         if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
           error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
             s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
@@ -1037,12 +1037,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match {
         case FindCoordinatorRequest.CoordinatorType.GROUP =>
           val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
-          val metadata = getOrCreateInternalTopic(GroupMetadataTopicName, request.listenerName)
+          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.listenerName)
           (partition, metadata)
 
         case FindCoordinatorRequest.CoordinatorType.TRANSACTION =>
           val partition = txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
-          val metadata = getOrCreateInternalTopic(TransactionStateTopicName, request.listenerName)
+          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.listenerName)
           (partition, metadata)
 
         case _ =>
@@ -1504,7 +1504,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest]
     val transactionalId = addOffsetsToTxnRequest.transactionalId
     val groupId = addOffsetsToTxnRequest.consumerGroupId
-    val offsetTopicPartition = new TopicPartition(GroupMetadataTopicName, groupCoordinator.partitionFor(groupId))
+    val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
     // Send response callback
     def sendResponseCallback(error: Errors): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 2e4c19a..4e1cd37 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -23,10 +23,11 @@ import scala.collection.{Seq, Set, mutable}
 import scala.collection.JavaConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.api._
-import kafka.common.{BrokerEndPointNotAvailableException, Topic, TopicAndPartition}
+import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition}
 import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9cd92f7..b3d1d32 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -46,7 +46,8 @@ import scala.collection._
 import scala.collection.JavaConverters._
 import java.util.{Map => JMap}
 
-import kafka.common.{KafkaStorageException, Topic}
+import kafka.common.KafkaStorageException
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests.EpochEndOffset._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 0bd749a..b4b3722 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -25,10 +25,11 @@ import java.util.Date
 import java.text.SimpleDateFormat
 
 import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging}
-import kafka.common.Topic
 import java.io.{BufferedOutputStream, OutputStream}
 import java.nio.charset.StandardCharsets
 
+import org.apache.kafka.common.internals.Topic
+
 /**
  * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days).
  *
@@ -47,7 +48,7 @@ import java.nio.charset.StandardCharsets
 object StateChangeLogMerger extends Logging {
 
   val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"
-  val topicPartitionRegex = new Regex("\\[(" + Topic.legalChars + "+),( )*([0-9]+)\\]")
+  val topicPartitionRegex = new Regex("\\[(" + Topic.LEGAL_CHARS + "+),( )*([0-9]+)\\]")
   val dateRegex = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}")
   val dateFormat = new SimpleDateFormat(dateFormatString)
   var files: List[String] = List()

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4277d26..52a90d8 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,7 +18,6 @@ import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
-import kafka.common
 import kafka.common.TopicAndPartition
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
@@ -27,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests._
 import CreateTopicsRequest.TopicDetails
@@ -162,7 +162,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, common.Topic.GroupMetadataTopicName,
+    TestUtils.createTopic(zkUtils, GROUP_METADATA_TOPIC_NAME,
       1,
       1,
       servers,
@@ -563,8 +563,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     // set the subscription pattern to an internal topic that the consumer has read permission to. Since
     // internal topics are not included, we should not be assigned any partitions from this topic
-    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),  new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName))
-    consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),  new Resource(Topic,
+      GROUP_METADATA_TOPIC_NAME))
+    consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener)
     consumer.poll(0)
     assertTrue(consumer.subscription().isEmpty)
     assertTrue(consumer.assignment().isEmpty)
@@ -590,10 +591,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       assertEquals(Set(topic).asJava, consumer.subscription)
 
       // now authorize the user for the internal topic and verify that we can subscribe
-      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic, kafka.common.Topic.GroupMetadataTopicName))
-      consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener)
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic,
+        GROUP_METADATA_TOPIC_NAME))
+      consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener)
       consumer.poll(0)
-      assertEquals(Set(kafka.common.Topic.GroupMetadataTopicName), consumer.subscription.asScala)
+      assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
     } finally consumer.close()
   }
 
@@ -605,7 +607,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    val internalTopicResource = new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName)
+    val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
 
     val consumerConfig = new Properties

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 8d394c6..992e74a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -20,7 +20,6 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import kafka.utils.{Logging, ShutdownableThread, TestUtils}
-import kafka.common.Topic
 import kafka.server.KafkaConfig
 import org.junit.Assert._
 import org.junit.{Before, Test}
@@ -29,6 +28,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Buffer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.internals.Topic
 
 /**
  * Integration tests for the new consumer that cover basic usage as well as server failures
@@ -103,7 +103,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     // get metadata for the topic
     var parts: Seq[PartitionInfo] = null
     while (parts == null)
-      parts = consumer0.partitionsFor(Topic.GroupMetadataTopicName).asScala
+      parts = consumer0.partitionsFor(Topic.GROUP_METADATA_TOPIC_NAME).asScala
     assertEquals(1, parts.size)
     assertNotNull(parts.head.leader())
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 09b715d..fd588de 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -12,7 +12,6 @@
  */
 package kafka.api
 
-import kafka.common.Topic
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.Log
 import kafka.server.KafkaConfig
@@ -26,6 +25,7 @@ import org.junit.Assert._
 import scala.collection.JavaConverters._
 import java.util.Properties
 
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.CompressionType
 
 class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
@@ -43,13 +43,13 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
     val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
                                                securityProtocol = SecurityProtocol.PLAINTEXT)
     val offsetMap = Map(
-      new TopicPartition(Topic.GroupMetadataTopicName, 0) -> new OffsetAndMetadata(10, "")
+      new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "")
     ).asJava
     consumer.commitSync(offsetMap)
     val logManager = servers.head.getLogManager
 
     def getGroupMetadataLogOpt: Option[Log] =
-      logManager.getLog(new TopicPartition(Topic.GroupMetadataTopicName, 0))
+      logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
 
     TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.batches.asScala.nonEmpty)),
                             "Commit message not appended in time")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 7372ac4..0c44ca9 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -20,7 +20,6 @@ package kafka.api
 import java.util.concurrent.{ExecutionException, TimeoutException}
 import java.util.Properties
 
-import kafka.common.Topic
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
@@ -28,6 +27,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -226,7 +226,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   def testCannotSendToInternalTopic() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     val thrown = intercept[ExecutionException] {
-      producer2.send(new ProducerRecord(Topic.GroupMetadataTopicName, "test".getBytes, "test".getBytes)).get
+      producer2.send(new ProducerRecord(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes, "test".getBytes)).get
     }
     assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException])
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 7db9d7c..4d879d0 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -23,7 +23,6 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.TimeUnit
 
 import kafka.api.SaslSetup
-import kafka.common.Topic
 import kafka.coordinator.group.OffsetConfig
 import kafka.utils.JaasTestUtils.JaasSection
 import kafka.utils.TestUtils
@@ -31,6 +30,7 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.junit.Assert.assertEquals
 import org.junit.{After, Before, Test}
@@ -103,7 +103,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
         Internal, config.interBrokerListenerName.value)
     }
 
-    TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+    TestUtils.createTopic(zkUtils, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
       replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
 
     servers.head.config.listeners.foreach { endPoint =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 5215867..ad6cfa5 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -18,7 +18,6 @@ package kafka.admin
 
 import org.junit.Assert._
 import org.junit.Test
-import kafka.common.Topic
 import kafka.utils.Logging
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
@@ -26,6 +25,7 @@ import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils._
 import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.internals.Topic
 
 class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
@@ -86,15 +86,15 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
-      "--topic", Topic.GroupMetadataTopicName))
+      "--topic", Topic.GROUP_METADATA_TOPIC_NAME))
     TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
 
-    // try to delete the Topic.GroupMetadataTopicName and make sure it doesn't
-    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GroupMetadataTopicName))
-    val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GroupMetadataTopicName)
+    // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
+    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
+    val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
-        TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
+      TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
     }
     assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala
deleted file mode 100644
index caab674..0000000
--- a/core/src/test/scala/unit/kafka/common/TopicTest.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-import org.junit.Assert._
-import collection.mutable.ArrayBuffer
-import org.junit.Test
-
-class TopicTest {
-
-  @Test
-  def testInvalidTopicNames() {
-    val invalidTopicNames = new ArrayBuffer[String]()
-    invalidTopicNames += ("", ".", "..")
-    var longName = "ATCG"
-    for (_ <- 1 to 6)
-      longName += longName
-    invalidTopicNames += longName
-    invalidTopicNames += longName.drop(6)
-    val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=')
-    for (weirdChar <- badChars) {
-      invalidTopicNames += "Is" + weirdChar + "illegal"
-    }
-
-    for (i <- invalidTopicNames.indices) {
-      try {
-        Topic.validate(invalidTopicNames(i))
-        fail("Should throw InvalidTopicException.")
-      }
-      catch {
-        case _: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
-      }
-    }
-
-    val validTopicNames = new ArrayBuffer[String]()
-    validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_.", longName.drop(7))
-    for (i <- validTopicNames.indices) {
-      try {
-        Topic.validate(validTopicNames(i))
-      }
-      catch {
-        case _: Exception => fail("Should not throw exception.")
-      }
-    }
-  }
-
-  @Test
-  def testTopicHasCollisionChars() = {
-    val falseTopics = List("start", "end", "middle", "many")
-    val trueTopics = List(
-      ".start", "end.", "mid.dle", ".ma.ny.",
-      "_start", "end_", "mid_dle", "_ma_ny."
-    )
-
-    falseTopics.foreach( t =>
-      assertFalse(Topic.hasCollisionChars(t))
-    )
-
-    trueTopics.foreach( t =>
-      assertTrue(Topic.hasCollisionChars(t))
-    )
-  }
-
-  @Test
-  def testTopicHasCollision() = {
-    val periodFirstMiddleLastNone = List(".topic", "to.pic", "topic.", "topic")
-    val underscoreFirstMiddleLastNone = List("_topic", "to_pic", "topic_", "topic")
-
-    // Self
-    periodFirstMiddleLastNone.foreach { t =>
-      assertTrue(Topic.hasCollision(t, t))
-    }
-    underscoreFirstMiddleLastNone.foreach { t =>
-      assertTrue(Topic.hasCollision(t, t))
-    }
-
-    // Same Position
-    periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone).foreach { case (t1, t2) =>
-      assertTrue(Topic.hasCollision(t1, t2))
-    }
-
-    // Different Position
-    periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone.reverse).foreach { case (t1, t2) =>
-      assertFalse(Topic.hasCollision(t1, t2))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index b1e1274..8de4a89 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -18,10 +18,10 @@
 package kafka.consumer
 
 
+import org.apache.kafka.common.internals.Topic
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
-import kafka.common.Topic
 
 class TopicFilterTest extends JUnitSuite {
 
@@ -36,8 +36,8 @@ class TopicFilterTest extends JUnitSuite {
 
     val topicFilter2 = Whitelist(".+")
     assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
-    assertFalse(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter2.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
+    assertTrue(topicFilter2.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
 
     val topicFilter3 = Whitelist("white_listed-topic.+")
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -56,8 +56,8 @@ class TopicFilterTest extends JUnitSuite {
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
 
-    assertFalse(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter1.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
index bfc9be2..0ace2e7 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
@@ -18,18 +18,19 @@
 package kafka.coordinator.group
 
 
-import kafka.common.{OffsetAndMetadata, Topic}
+import kafka.common.OffsetAndMetadata
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
 import kafka.utils._
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType}
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, TimestampType}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import java.util.concurrent.TimeUnit
 
+import org.apache.kafka.common.internals.Topic
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -83,13 +84,13 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString)
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
     val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
+    ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
 
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2))
+    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
     EasyMock.replay(zkUtils)
 
     timer = new MockTimer
@@ -310,7 +311,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 
@@ -783,7 +784,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE, error)
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
 
-    val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
 
     // Send commit marker.
     groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
@@ -808,7 +809,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE, error)
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
 
-    val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
 
     // Validate that the pending commit is discarded.
     groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
@@ -832,7 +833,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE, error)
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
 
-    val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
 
     val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -859,8 +860,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val producerEpoch: Short = 3
 
     val groupIds = List(groupId, otherGroupId)
-    val offsetTopicPartitions = List(new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(groupId)),
-      new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(otherGroupId)))
+    val offsetTopicPartitions = List(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)),
+      new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(otherGroupId)))
 
     groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition)
     val errors = mutable.ArrayBuffer[Errors]()
@@ -937,7 +938,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val producerIds = List(1000L, 1005L)
     val producerEpochs: Seq[Short] = List(3, 4)
 
-    val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(groupId))
+    val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
 
     val errors = mutable.ArrayBuffer[Errors]()
     val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
@@ -1346,7 +1347,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})
@@ -1428,7 +1429,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})
@@ -1454,7 +1455,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(groupId)) ->
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) ->
           new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})
@@ -1470,7 +1471,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index ce3b997..f76eb7b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -19,12 +19,11 @@ package kafka.coordinator.group
 
 import kafka.api.ApiVersion
 import kafka.cluster.Partition
-import kafka.common.{OffsetAndMetadata, Topic}
+import kafka.common.OffsetAndMetadata
 import kafka.log.{Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.TestUtils.fail
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
-
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
@@ -33,9 +32,10 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{Before, Test}
-
 import java.nio.ByteBuffer
 
+import org.apache.kafka.common.internals.Topic
+
 import scala.collection.JavaConverters._
 import scala.collection._
 
@@ -71,7 +71,7 @@ class GroupMetadataManagerTest {
 
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2))
+    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
     EasyMock.replay(zkUtils)
 
     time = new MockTime
@@ -82,7 +82,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadOffsetsWithoutGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val startOffset = 15L
 
     val committedOffsets = Map(
@@ -110,7 +110,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadTransactionalOffsetsWithoutGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -144,7 +144,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testDoNotLoadAbortedTransactionalOffsetCommits() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -174,7 +174,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testGroupLoadedWithPendingCommits(): Unit = {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -209,7 +209,7 @@ class GroupMetadataManagerTest {
   @Test
   def testLoadWithCommittedAndAbortedTransactionalOffsetCommits() {
     // A test which loads a log with a mix of committed and aborted transactional offset committed messages.
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -254,7 +254,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadWithCommittedAndAbortedAndPendingTransactionalOffsetCommits() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val producerId = 1000L
     val producerEpoch: Short = 2
 
@@ -316,7 +316,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadTransactionalOffsetCommitsFromMultipleProducers(): Unit = {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val firstProducerId = 1000L
     val firstProducerEpoch: Short = 2
     val secondProducerId = 1001L
@@ -386,7 +386,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadOffsetsWithTombstones() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val startOffset = 15L
 
     val tombstonePartition = new TopicPartition("foo", 1)
@@ -421,7 +421,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadOffsetsAndGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val startOffset = 15L
     val committedOffsets = Map(
       new TopicPartition("foo", 0) -> 23L,
@@ -454,7 +454,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testLoadGroupWithTombstone() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val startOffset = 15L
 
     val memberId = "98098230493"
@@ -478,7 +478,7 @@ class GroupMetadataManagerTest {
     // 1. the group exists at some point in time, but is later removed (because all members left)
     // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets
 
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
     val startOffset = 15L
 
     val committedOffsets = Map(
@@ -765,7 +765,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -817,7 +817,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
       isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -865,7 +865,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition))
     EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
       isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -920,7 +920,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -997,7 +997,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition))
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -1044,7 +1044,7 @@ class GroupMetadataManagerTest {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
         )
       )})

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
index 83cba71..77577cf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
@@ -18,11 +18,11 @@ package kafka.coordinator.transaction
 
 import java.util.Properties
 
-import kafka.common.Topic
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.utils.Utils
@@ -41,7 +41,7 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness {
 
   @Test
   def shouldCommitTransaction(): Unit = {
-    TestUtils.createTopic(zkUtils, Topic.TransactionStateTopicName, 1, 1, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
+    TestUtils.createTopic(zkUtils, Topic.TRANSACTION_STATE_TOPIC_NAME, 1, 1, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
     val topic = "foo"
     TestUtils.createTopic(this.zkUtils, topic, 1, 1, servers)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 0250f60..0d3263a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -18,13 +18,12 @@ package kafka.coordinator.transaction
 
 import java.nio.ByteBuffer
 
-import kafka.common.Topic
-import kafka.common.Topic.TransactionStateTopicName
 import kafka.log.Log
 import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.{MockScheduler, Pool, ZkUtils}
 import kafka.utils.TestUtils.fail
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.IsolationLevel
@@ -44,7 +43,7 @@ class TransactionStateManagerTest {
   val partitionId = 0
   val numPartitions = 2
   val transactionTimeoutMs: Int = 1000
-  val topicPartition = new TopicPartition(TransactionStateTopicName, partitionId)
+  val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
   val coordinatorEpoch = 10
 
   val txnRecords: mutable.ArrayBuffer[SimpleRecord] = mutable.ArrayBuffer[SimpleRecord]()
@@ -54,7 +53,7 @@ class TransactionStateManagerTest {
   val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
   val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
-  EasyMock.expect(zkUtils.getTopicPartitionCount(TransactionStateTopicName))
+  EasyMock.expect(zkUtils.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME))
     .andReturn(Some(numPartitions))
     .anyTimes()
 
@@ -403,7 +402,7 @@ class TransactionStateManagerTest {
       EasyMock.capture(capturedArgument)))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(
-          Map(new TopicPartition(Topic.TransactionStateTopicName, partitionId) ->
+          Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->
             new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
           )
         )

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index ed0e805..fdc9a95 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -17,15 +17,16 @@
 
 package kafka.server
 
-import java.util.{Properties}
+import java.util.Properties
 
-import kafka.common.Topic
 import kafka.utils.TestUtils
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.kafka.test.TestUtils.isValidClusterId
+
 import scala.collection.JavaConverters._
 
 class MetadataRequestTest extends BaseRequestTest {
@@ -80,7 +81,7 @@ class MetadataRequestTest extends BaseRequestTest {
 
   @Test
   def testIsInternal() {
-    val internalTopic = Topic.GroupMetadataTopicName
+    val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME
     val notInternalTopic = "notInternal"
     // create the topics
     TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)


Mime
View raw message