kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2832: Add a consumer config option to exclude internal topics
Date Thu, 17 Mar 2016 19:33:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk dce06766d -> 30e78fa00


KAFKA-2832: Add a consumer config option to exclude internal topics

A new consumer config option 'exclude.internal.topics' was added to
allow excluding internal topics when wildcards are used to specify
consumers.
The new option takes a boolean value, with a default 'false' value (i.e.
no exclusion).

This patch is co-authored with rajinisivaram edoardocomar mimaison

Author: edoardo <ecomar@uk.ibm.com>
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma, Jun Rao, Gwen Shapira

Closes #1082 from edoardocomar/KAFKA-2832


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30e78fa0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30e78fa0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30e78fa0

Branch: refs/heads/trunk
Commit: 30e78fa00650b258f3ab5ef6c9bdf5ca137289c0
Parents: dce0676
Author: edoardo <ecomar@uk.ibm.com>
Authored: Thu Mar 17 12:33:47 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Mar 17 12:33:47 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  | 11 ++++++
 .../kafka/clients/consumer/KafkaConsumer.java   |  3 +-
 .../consumer/internals/ConsumerCoordinator.java |  9 +++--
 .../kafka/common/internals/TopicConstants.java  | 33 +++++++++++++++++
 .../internals/ConsumerCoordinatorTest.java      | 37 ++++++++++++++++----
 .../main/scala/kafka/admin/TopicCommand.scala   |  7 ++--
 core/src/main/scala/kafka/common/Topic.scala    |  2 --
 .../main/scala/kafka/consumer/TopicFilter.scala |  5 +--
 .../kafka/coordinator/GroupCoordinator.scala    |  3 --
 .../coordinator/GroupMetadataManager.scala      | 30 ++++++++--------
 .../src/main/scala/kafka/server/KafkaApis.scala | 11 +++---
 .../scala/kafka/server/ReplicaManager.scala     |  5 ++-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  5 ++-
 .../kafka/api/BaseConsumerTest.scala            |  7 ++--
 .../kafka/api/IntegrationTestHarness.scala      |  4 +--
 .../kafka/api/ProducerFailureHandlingTest.scala |  4 +--
 .../unit/kafka/admin/TopicCommandTest.scala     |  9 ++---
 .../unit/kafka/consumer/TopicFilterTest.scala   |  9 ++---
 .../GroupCoordinatorResponseTest.scala          | 14 ++++----
 19 files changed, 135 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index bd9efc3..9101307 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -172,6 +172,12 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
     private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
 
+    /** <code>exclude.internal.topics</code> */
+    public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
+    private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. "
+                                                            + "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it.";
+    public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true;
+    
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -316,6 +322,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(1),
                                         Importance.MEDIUM,
                                         MAX_POLL_RECORDS_DOC)
+                                .define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
+                                        Type.BOOLEAN,
+                                        EXCLUDE_INTERNAL_TOPICS_DEFAULT,
+                                        Importance.MEDIUM,
+                                        EXCLUDE_INTERNAL_TOPICS_DOC)
 
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d7c8e14..804a160 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -612,7 +612,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                     config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                    this.interceptors);
+                    this.interceptors,
+                    config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2ae1437..cf93530 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.TopicConstants;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -69,6 +70,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final boolean autoCommitEnabled;
     private final AutoCommitTask autoCommitTask;
     private final ConsumerInterceptors<?, ?> interceptors;
+    private final boolean excludeInternalTopics;
 
     /**
      * Initialize the coordination manager.
@@ -87,7 +89,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                OffsetCommitCallback defaultOffsetCommitCallback,
                                boolean autoCommitEnabled,
                                long autoCommitIntervalMs,
-                               ConsumerInterceptors<?, ?> interceptors) {
+                               ConsumerInterceptors<?, ?> interceptors,
+                               boolean excludeInternalTopics) {
         super(client,
                 groupId,
                 sessionTimeoutMs,
@@ -110,6 +113,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
+        this.excludeInternalTopics = excludeInternalTopics;
     }
 
     @Override
@@ -140,7 +144,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     final List<String> topicsToSubscribe = new ArrayList<>();
 
                     for (String topic : cluster.topics())
-                        if (subscriptions.getSubscribedPattern().matcher(topic).matches())
+                        if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
+                                !(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic)))
                             topicsToSubscribe.add(topic);
 
                     subscriptions.changeSubscription(topicsToSubscribe);

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
new file mode 100644
index 0000000..5d6b992
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+public final class TopicConstants {
+
+    //avoid instantiation
+    private TopicConstants() {
+    }
+
+    // TODO: we store both group metadata and offset data here despite the topic name being offsets only
+    public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
+    public static final Collection<String> INTERNAL_TOPICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(GROUP_METADATA_TOPIC_NAME)));
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0b8a162..260ee7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.internals.TopicConstants;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -63,6 +65,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -107,7 +110,7 @@ public class ConsumerCoordinatorTest {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-        this.coordinator = buildCoordinator(metrics, assignors);
+        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
     }
 
     @After
@@ -263,7 +266,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test(expected = ApiException.class)
-    public void testJoinGroupInvalidGroupId() {
+    public void testJoinGroupInvalidGroupId() { 
         final String consumerId = "leader";
 
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@@ -509,7 +512,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testMetadataChangeTriggersRebalance() {
+    public void testMetadataChangeTriggersRebalance() { 
         final String consumerId = "consumer";
 
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@@ -533,6 +536,25 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testExcludeInternalTopicsConfigOption() { 
+        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+
+        metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+    }
+
+    @Test
+    public void testIncludeInternalTopicsConfigOption() {
+        coordinator = buildCoordinator(new Metrics(), assignors, false);
+        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+
+        metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+    }
+    
+    @Test
     public void testRejoinGroup() {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
@@ -882,7 +904,7 @@ public class ConsumerCoordinatorTest {
         RangeAssignor range = new RangeAssignor();
 
         try (Metrics metrics = new Metrics(time)) {
-            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range));
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -890,7 +912,7 @@ public class ConsumerCoordinatorTest {
         }
 
         try (Metrics metrics = new Metrics(time)) {
-            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin));
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(range.name(), metadata.get(0).name());
@@ -898,7 +920,7 @@ public class ConsumerCoordinatorTest {
         }
     }
 
-    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) {
+    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics) {
         return new ConsumerCoordinator(
                 consumerClient,
                 groupId,
@@ -914,7 +936,8 @@ public class ConsumerCoordinatorTest {
                 defaultOffsetCommitCallback,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
-                null);
+                null,
+                excludeInternalTopics);
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index e89e09d..b3b0635 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -18,7 +18,6 @@
 package kafka.admin
 
 import java.util.Properties
-
 import joptsimple._
 import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
 import kafka.consumer.{ConsumerConfig, Whitelist}
@@ -30,9 +29,9 @@ import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
-
 import scala.collection.JavaConversions._
 import scala.collection._
+import org.apache.kafka.common.internals.TopicConstants
 
 
 object TopicCommand extends Logging {
@@ -138,7 +137,7 @@ object TopicCommand extends Logging {
       }
 
       if(opts.options.has(opts.partitionsOpt)) {
-        if (topic == GroupCoordinator.GroupMetadataTopicName) {
+        if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
         }
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +
@@ -171,7 +170,7 @@ object TopicCommand extends Logging {
     }
     topics.foreach { topic =>
       try {
-        if (Topic.InternalTopics.contains(topic)) {
+        if (TopicConstants.INTERNAL_TOPICS.contains(topic)) {
           throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
         } else {
           zkUtils.createPersistentPath(getDeleteTopicPath(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 55d2bdb..930d0e4 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -25,8 +25,6 @@ object Topic {
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + "+")
 
-  val InternalTopics = Set(GroupCoordinator.GroupMetadataTopicName)
-
   def validate(topic: String) {
     if (topic.length <= 0)
       throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty")

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 5a13540..b89968e 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -21,6 +21,7 @@ package kafka.consumer
 import kafka.utils.Logging
 import java.util.regex.{PatternSyntaxException, Pattern}
 import kafka.common.Topic
+import org.apache.kafka.common.internals.TopicConstants
 
 
 sealed abstract class TopicFilter(rawRegex: String) extends Logging {
@@ -47,7 +48,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
 
 case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
   override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
-    val allowed = topic.matches(regex) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
+    val allowed = topic.matches(regex) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
 
     debug("%s %s".format(
       topic, if (allowed) "allowed" else "filtered"))
@@ -60,7 +61,7 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
 
 case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
   override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
-    val allowed = (!topic.matches(regex)) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
+    val allowed = (!topic.matches(regex)) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics)
 
     debug("%s %s".format(
       topic, if (allowed) "allowed" else "filtered"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 36d7bbb..30a3a78 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -725,9 +725,6 @@ object GroupCoordinator {
   val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
   val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
 
-  // TODO: we store both group metadata and offset data here despite the topic name being offsets only
-  val GroupMetadataTopicName = "__consumer_offsets"
-
   def apply(config: KafkaConfig,
             zkUtils: ZkUtils,
             replicaManager: ReplicaManager,

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 2c0236e..c6bc44e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -18,7 +18,6 @@
 package kafka.coordinator
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import kafka.utils.CoreUtils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
@@ -40,14 +39,13 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import kafka.common.MessageFormatter
 import kafka.server.ReplicaManager
-
 import scala.collection._
 import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.TimeUnit
-
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.internals.TopicConstants
 
 case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
                         callback: Map[TopicPartition, PartitionResponse] => Unit)
@@ -147,9 +145,9 @@ class GroupMetadataManager(val brokerId: Int,
       val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
         timestamp = timestamp, magicValue = magicValue)
 
-      val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+      val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
       partitionOpt.foreach { partition =>
-        val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+        val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
 
         trace("Marking group %s as deleted.".format(group.groupId))
 
@@ -177,7 +175,7 @@ class GroupMetadataManager(val brokerId: Int,
       timestamp = timestamp,
       magicValue = magicValue)
 
-    val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
+    val groupMetadataPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
 
     val groupMetadataMessageSet = Map(groupMetadataPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
@@ -263,7 +261,7 @@ class GroupMetadataManager(val brokerId: Int,
       )
     }.toSeq
 
-    val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))
+    val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId))
 
     val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -351,7 +349,7 @@ class GroupMetadataManager(val brokerId: Int,
    */
   def loadGroupsForPartition(offsetsPartition: Int,
                              onGroupLoaded: GroupMetadata => Unit) {
-    val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+    val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
     scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
 
     def loadGroupsAndOffsets() {
@@ -470,7 +468,7 @@ class GroupMetadataManager(val brokerId: Int,
    */
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
-    val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+    val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
     scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
 
     def removeGroupsAndOffsets() {
@@ -507,10 +505,10 @@ class GroupMetadataManager(val brokerId: Int,
       }
 
       if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
-        .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+        .format(numOffsetsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
 
       if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
-        .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+        .format(numGroupsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
     }
   }
 
@@ -566,9 +564,9 @@ class GroupMetadataManager(val brokerId: Int,
       // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
       // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
       tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
-        val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+        val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
         partitionOpt.map { partition =>
-          val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+          val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
           val messages = tombstones.map(_._2).toSeq
 
           trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
@@ -593,7 +591,7 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
-    val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, partitionId)
+    val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionId)
 
     val hw = partitionOpt.map { partition =>
       partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -621,7 +619,7 @@ class GroupMetadataManager(val brokerId: Int,
    * If the topic does not exist, the configured partition count is returned.
    */
   private def getOffsetsTopicPartitionCount = {
-    val topic = GroupCoordinator.GroupMetadataTopicName
+    val topic = TopicConstants.GROUP_METADATA_TOPIC_NAME
     val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
     if (topicData(topic).nonEmpty)
       topicData(topic).size
@@ -630,7 +628,7 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
-    val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
+    val groupMetadataTopicAndPartition = new TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partition)
     val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
       throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 452f721..0fb4d74 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,6 +45,7 @@ MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, Of
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
+import org.apache.kafka.common.internals.TopicConstants
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -129,11 +130,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         // this callback is invoked under the replica state change lock to ensure proper order of
         // leadership changes
         updatedLeaders.foreach { partition =>
-          if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
+          if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
             coordinator.handleGroupImmigration(partition.partitionId)
         }
         updatedFollowers.foreach { partition =>
-          if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
+          if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME)
             coordinator.handleGroupEmigration(partition.partitionId)
         }
       }
@@ -643,12 +644,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
       else
         config.offsetsTopicReplicationFactor.toInt
-    createTopic(GroupCoordinator.GroupMetadataTopicName, config.offsetsTopicPartitions,
+    createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, config.offsetsTopicPartitions,
       offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
   }
 
   private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), securityProtocol)
+    val topicMetadata = metadataCache.getTopicMetadata(Set(TopicConstants.GROUP_METADATA_TOPIC_NAME), securityProtocol)
     topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
   }
 
@@ -659,7 +660,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == GroupCoordinator.GroupMetadataTopicName) {
+        if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) {
           createGroupMetadataTopic()
         } else if (config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index de58e56..f050e27 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,6 @@ package kafka.server
 import java.io.{File, IOException}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
@@ -38,9 +37,9 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time => JTime}
-
 import scala.collection._
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.internals.TopicConstants
 
 /*
  * Result metadata of a log append operation on the log
@@ -395,7 +394,7 @@ class ReplicaManager(val config: KafkaConfig,
       BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
 
       // reject appending to internal topics if it is not allowed
-      if (Topic.InternalTopics.contains(topicPartition.topic) && !internalTopicsAllowed) {
+      if (TopicConstants.INTERNAL_TOPICS.contains(topicPartition.topic) && !internalTopicsAllowed) {
         (topicPartition, LogAppendResult(
           LogAppendInfo.UnknownLogAppendInfo,
           Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic)))))

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index b09c541..fad7657 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -17,7 +17,6 @@ import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.concurrent.ExecutionException
 import java.util.{ArrayList, Collections, Properties}
-
 import kafka.cluster.EndPoint
 import kafka.common.TopicAndPartition
 import kafka.coordinator.GroupCoordinator
@@ -34,10 +33,10 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
+import org.apache.kafka.common.internals.TopicConstants
 
 class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   val topic = "topic"
@@ -143,7 +142,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName,
+    TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
       1,
       1,
       servers,

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 684b38f..f576be5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -13,23 +13,20 @@
 package kafka.api
 
 import java.util
-
 import kafka.coordinator.GroupCoordinator
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-
 import kafka.utils.{TestUtils, Logging, ShutdownableThread}
 import kafka.server.KafkaConfig
-
 import java.util.ArrayList
 import org.junit.Assert._
 import org.junit.{Before, Test}
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
+import org.apache.kafka.common.internals.TopicConstants
 
 /**
  * Integration tests for the new consumer that cover basic usage as well as server failures
@@ -196,7 +193,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     // get metadata for the topic
     var parts: Seq[PartitionInfo] = null
     while (parts == null)
-      parts = consumer0.partitionsFor(GroupCoordinator.GroupMetadataTopicName).asScala
+      parts = consumer0.partitionsFor(TopicConstants.GROUP_METADATA_TOPIC_NAME).asScala
     assertEquals(1, parts.size)
     assertNotNull(parts(0).leader())
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index b4f31c4..d0680b8 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -24,10 +24,10 @@ import java.util.Properties
 import org.apache.kafka.clients.producer.KafkaProducer
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
-
 import org.junit.{After, Before}
 import scala.collection.mutable.Buffer
 import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants
 
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
@@ -75,7 +75,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
     }
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName,
+    TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 63a6b6f..2bb203d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -19,7 +19,6 @@ package kafka.api
 
 import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
 import java.util.{Properties, Random}
-
 import kafka.common.Topic
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
@@ -31,6 +30,7 @@ import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import org.apache.kafka.common.internals.TopicConstants
 
 class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val producerBufferSize = 30000
@@ -198,7 +198,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testCannotSendToInternalTopic() {
     val thrown = intercept[ExecutionException] {
-      producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
+      producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](TopicConstants.INTERNAL_TOPICS.iterator.next, "test".getBytes, "test".getBytes)).get
     }
     assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException])
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index b42aaf4..e0107da 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -26,6 +26,7 @@ import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils._
 import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants
 
 class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
@@ -86,12 +87,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
-      "--topic", GroupCoordinator.GroupMetadataTopicName))
+      "--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
     TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
 
-    // try to delete the GroupCoordinator.GroupMetadataTopicName and make sure it doesn't
-    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.GroupMetadataTopicName))
-    val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.GroupMetadataTopicName)
+    // try to delete the TopicConstants.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
+    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME))
+    val deleteOffsetTopicPath = getDeleteTopicPath(TopicConstants.GROUP_METADATA_TOPIC_NAME)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
         TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 1e8d04e..0e0a06a 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -22,6 +22,7 @@ import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.TopicConstants
 
 
 class TopicFilterTest extends JUnitSuite {
@@ -37,8 +38,8 @@ class TopicFilterTest extends JUnitSuite {
 
     val topicFilter2 = new Whitelist(".+")
     assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
-    assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
+    assertTrue(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
 
     val topicFilter3 = new Whitelist("white_listed-topic.+")
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -57,8 +58,8 @@ class TopicFilterTest extends JUnitSuite {
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
 
-    assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 50fa09e..acdb660 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -19,7 +19,6 @@ package kafka.coordinator
 
 import org.apache.kafka.common.record.Record
 import org.junit.Assert._
-
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 import kafka.message.{Message, MessageSet}
 import kafka.server.{ReplicaManager, KafkaConfig}
@@ -32,12 +31,11 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, IAnswer, EasyMock}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
-
 import java.util.concurrent.TimeUnit
-
 import scala.collection._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise}
+import org.apache.kafka.common.internals.TopicConstants
 
 /**
  * Test GroupCoordinator responses
@@ -81,12 +79,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
     val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (GroupCoordinator.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
+    ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
 
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
+    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
     groupCoordinator = GroupCoordinator(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
@@ -834,7 +832,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+        Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
@@ -911,7 +909,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+        Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
           new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP)
         )
       )})
@@ -925,7 +923,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 
-    EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
+    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
     EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
     EasyMock.replay(replicaManager)
 


Mime
View raw message