kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1383739 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/log/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/server/ test/scala/unit/kafka/utils/
Date Wed, 12 Sep 2012 02:14:38 GMT
Author: junrao
Date: Wed Sep 12 02:14:37 2012
New Revision: 1383739

URL: http://svn.apache.org/viewvc?rev=1383739&view=rev
Log:
Handle topic names with / on Kafka server (0.8 branch); patched by Swapnil Ghike; reviewed
by Jay Kreps, Joel Koshy and Jun Rao; KAFKA-495

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1383739&r1=1383738&r2=1383739&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
Wed Sep 12 02:14:37 2012
@@ -18,7 +18,7 @@
 package kafka.admin
 
 import joptsimple.OptionParser
-import kafka.utils.{Logging, Utils, ZKStringSerializer, ZkUtils}
+import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import scala.collection.mutable
 
@@ -51,6 +51,11 @@ object CreateTopicCommand extends Loggin
                                         "broker_id_for_part2_replica1 : broker_id_for_part2_replica2
, ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
+    val maxTopicNameLenOpt = parser.accepts("max-name-len", "maximum length of the topic
name")
+                           .withRequiredArg
+                           .describedAs("max topic name length")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(Topic.maxNameLength)
 
     val options = parser.parse(args : _*)
 
@@ -63,6 +68,7 @@ object CreateTopicCommand extends Loggin
     }
 
     val topic = options.valueOf(topicOpt)
+    val maxTopicNameLength = options.valueOf(maxTopicNameLenOpt).intValue
     val zkConnect = options.valueOf(zkConnectOpt)
     val nPartitions = options.valueOf(nPartitionsOpt).intValue
     val replicationFactor = options.valueOf(replicationFactorOpt).intValue
@@ -70,7 +76,8 @@ object CreateTopicCommand extends Loggin
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
+      val topicNameValidator = new TopicNameValidator(maxTopicNameLength)
+      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr,
topicNameValidator)
       println("creation succeeded!")
     } catch {
       case e =>
@@ -82,7 +89,10 @@ object CreateTopicCommand extends Loggin
     }
   }
 
-  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor:
Int = 1, replicaAssignmentStr: String = "") {
+  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor:
Int = 1, replicaAssignmentStr: String = "",
+                  topicNameValidator: TopicNameValidator = new TopicNameValidator(Topic.maxNameLength))
{
+    topicNameValidator.validate(topic)
+
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
 
     val partitionReplicaAssignment = if (replicaAssignmentStr == "")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1383739&r1=1383738&r2=1383739&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Sep 12
02:14:37 2012
@@ -23,7 +23,7 @@ import scala.collection._
 import kafka.server.KafkaConfig
 import kafka.api.OffsetRequest
 import kafka.log.Log._
-import kafka.common.{KafkaException, InvalidTopicException, UnknownTopicOrPartitionException}
+import kafka.common.{KafkaException, UnknownTopicOrPartitionException}
 
 /**
  * The guy who creates and hands out logs
@@ -95,8 +95,6 @@ private[kafka] class LogManager(val conf
    * Create a log for the given topic and the given partition
    */
   private def createLog(topic: String, partition: Int): Log = {
-    if (topic.length <= 0)
-      throw new InvalidTopicException("Topic name can't be emtpy")
     if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions))
{
       val error = "Wrong partition %d, valid partitions (0, %d)."
               .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions)
- 1))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1383739&r1=1383738&r2=1383739&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Sep
12 02:14:37 2012
@@ -23,7 +23,7 @@ import kafka.api._
 import kafka.common._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
@@ -45,6 +45,7 @@ class KafkaApis(val requestChannel: Requ
   private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
   private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
+  private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength)
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
   this.logIdent = "[KafkaApi on Broker " + brokerId + "], "
@@ -419,7 +420,8 @@ class KafkaApis(val requestChannel: Requ
             case ErrorMapping.UnknownTopicOrPartitionCode =>
               /* check if auto creation of topics is turned on */
               if(config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor,
+                                               topicNameValidator = topicNameValidator)
                 info("Auto creation of topic %s with %d partitions and replication factor
%d is successful!"
                              .format(topic, config.numPartitions, config.defaultReplicationFactor))
                 val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1383739&r1=1383738&r2=1383739&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Sep
12 02:14:37 2012
@@ -21,7 +21,7 @@ import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
+import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
 
 /**
  * Configuration settings for the kafka server
@@ -118,6 +118,9 @@ class KafkaConfig private (val props: Ve
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
+  /* the maximum length of topic name*/
+  val maxTopicNameLength = props.getIntInRange("max.topic.name.length", Topic.maxNameLength,
(1, Int.MaxValue))
+
   /**
    * Following properties are relevant to Kafka replication
    */

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala?rev=1383739&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Topic.scala Wed Sep 12 02:14:37
2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.common.InvalidTopicException
+import util.matching.Regex
+
+object Topic {
+  val maxNameLength = 255
+  val illegalChars = "/" + '\u0000' + '\u0001' + "-" + '\u001F' + '\u007F' + "-" + '\u009F'
+
+                     '\uD800' + "-" + '\uF8FF' + '\uFFF0' + "-" + '\uFFFF'
+}
+
+class TopicNameValidator(maxLen: Int) {
+  // Regex checks for illegal chars and "." and ".." filenames
+  private val rgx = new Regex("(^\\.{1,2}$)|[" + Topic.illegalChars + "]")
+
+  def validate(topic: String) {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name is illegal, can't be empty")
+    else if (topic.length > maxLen)
+      throw new InvalidTopicException("topic name is illegal, can't be longer than " + maxLen
+ " characters")
+
+    rgx.findFirstIn(topic) match {
+      case Some(t) => throw new InvalidTopicException("topic name " + topic + " is illegal,
doesn't match expected regular expression")
+      case None =>
+    }
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1383739&r1=1383738&r2=1383739&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
Wed Sep 12 02:14:37 2012
@@ -100,7 +100,7 @@ class TopicMetadataTest extends JUnit3Su
   private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
     // topic metadata request only requires 1 call from the replica manager
     val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
-    EasyMock.expect(replicaManager.config).andReturn(configs.head)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head).times(2)
     EasyMock.replay(replicaManager)
 
     // create a topic metadata request

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1383739&r1=1383738&r2=1383739&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
Wed Sep 12 02:14:37 2012
@@ -78,6 +78,7 @@ class SimpleFetchTest extends JUnit3Suit
     partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L
 
     EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
@@ -151,6 +152,7 @@ class SimpleFetchTest extends JUnit3Suit
     partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]
 
     EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId,
followerLEO))
     EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId
== configs(1).brokerId))
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala?rev=1383739&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TopicTest.scala Wed
Sep 12 02:14:37 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.InvalidTopicException
+import org.junit.Test
+
+class TopicTest {
+
+  @Test
+  def testInvalidTopicNames() {
+    val invalidTopicNames = new ArrayBuffer[String]()
+    invalidTopicNames += ("", ".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidTopicNames += longName
+    val badChars = Array('/', '\u0000', '\u0001', '\u0018', '\u001F', '\u008F', '\uD805',
'\uFFFA')
+    for (weirdChar <- badChars) {
+      invalidTopicNames += "Is" + weirdChar + "funny"
+    }
+
+    val topicNameValidator = new TopicNameValidator(Topic.maxNameLength)
+
+    for (i <- 0 until invalidTopicNames.size) {
+      try {
+        topicNameValidator.validate(invalidTopicNames(i))
+        fail("Should throw InvalidTopicException.")
+      }
+      catch {
+        case e: InvalidTopicException => "This is good."
+      }
+    }
+  }
+}



Mime
View raw message