kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1363022 - /incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
Date Wed, 18 Jul 2012 17:16:28 GMT
Author: jkreps
Date: Wed Jul 18 17:16:28 2012
New Revision: 1363022

URL: http://svn.apache.org/viewvc?rev=1363022&view=rev
Log:
KAFKA-371 Patch from Jonathan Creasy reviewed by me. Correctly handle the case of an empty
string as topic name or invalid partition number.

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1363022&r1=1363021&r2=1363022&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Jul 18
17:16:28 2012
@@ -42,7 +42,6 @@ private[kafka] class LogManager(val conf
   private val flushInterval = config.flushInterval
   private val topicPartitionsMap = config.topicPartitionsMap
   private val logCreationLock = new Object
-  private val random = new java.util.Random
   private val startupLatch: CountDownLatch = new CountDownLatch(1)
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
@@ -111,6 +110,14 @@ private[kafka] class LogManager(val conf
    * Create a log for the given topic and the given partition
    */
   private def createLog(topic: String, partition: Int): Log = {
+    if (topic.length <= 0)
+      throw new InvalidTopicException("topic name can't be emtpy")
+    if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions))
{
+      val error = "Wrong partition %d, valid partitions (0, %d)."
+        .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
+      warn(error)
+      throw new InvalidPartitionException(error)
+    }
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
@@ -134,13 +141,6 @@ private[kafka] class LogManager(val conf
     logs.get(topic)
   }
 
-  /**
-   * Pick a random partition from the given topic
-   */
-  def chooseRandomPartition(topic: String): Int = {
-    random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions))
-  }
-
   def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
     val log = getLog(offsetRequest.topic, offsetRequest.partition)
     log match {



Mime
View raw message