kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start; reviewed by Neha Narkhede
Date Wed, 17 Dec 2014 04:33:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 523b36589 -> ae0bb84fa


KAFKA-742: Existing directories under the Kafka data directory without any data cause process
to not start; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae0bb84f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae0bb84f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae0bb84f

Branch: refs/heads/trunk
Commit: ae0bb84fa7e599774cd984196dc62b3dc84b13ca
Parents: 523b365
Author: Ashish Singh <asingh@cloudera.com>
Authored: Tue Dec 16 20:33:11 2014 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Dec 16 20:33:28 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 22 +++++-
 core/src/main/scala/kafka/log/LogManager.scala  |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 75 ++++++++++++++++++++
 3 files changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae0bb84f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4fae2f0..024506c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -84,7 +84,7 @@ class Log(val dir: File,
   /* Calculate the offset of the next message */
   @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
activeSegment.size.toInt)
 
-  val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
+  val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
@@ -832,9 +832,25 @@ object Log {
   /**
    * Parse the topic and partition out of the directory name of a log
    */
-  def parseTopicPartitionName(name: String): TopicAndPartition = {
+  def parseTopicPartitionName(dir: File): TopicAndPartition = {
+    val name: String = dir.getName
+    if (name == null || name.isEmpty || !name.contains('-')) {
+      throwException(dir)
+    }
     val index = name.lastIndexOf('-')
-    TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
+    val topic: String = name.substring(0, index)
+    val partition: String = name.substring(index + 1)
+    if (topic.length < 1 || partition.length < 1) {
+      throwException(dir)
+    }
+    TopicAndPartition(topic, partition.toInt)
+  }
+
+  def throwException(dir: File) {
+    throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
+      "'" + dir.getName + "' is not in the form of topic-partition\n" +
+      "If a directory does not contain Kafka topic data it should not exist in Kafka's log
" +
+      "directory")
   }
 }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae0bb84f/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4d2924d..4ebaae0 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -134,7 +134,7 @@ class LogManager(val logDirs: Array[File],
         Utils.runnable {
           debug("Loading log '" + logDir.getName + "'")
 
-          val topicPartition = Log.parseTopicPartitionName(logDir.getName)
+          val topicPartition = Log.parseTopicPartitionName(logDir)
           val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
           val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae0bb84f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index d670ba7..c2dd8eb 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -688,4 +688,79 @@ class LogTest extends JUnitSuite {
     assertEquals(recoveryPoint, log.logEndOffset)
     cleanShutdownFile.delete()
   }
+
+  @Test
+  def testParseTopicPartitionName() {
+    val topic: String = "test_topic"
+    val partition:String = "143"
+    val dir: File = new File(logDir + topicPartitionName(topic, partition))
+    val topicAndPartition = Log.parseTopicPartitionName(dir);
+    assertEquals(topic, topicAndPartition.asTuple._1)
+    assertEquals(partition.toInt, topicAndPartition.asTuple._2)
+  }
+
+  @Test
+  def testParseTopicPartitionNameForEmptyName() {
+    try {
+      val dir: File = new File("")
+      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
+    } catch {
+      case e: Exception => // its GOOD!
+    }
+  }
+
+  @Test
+  def testParseTopicPartitionNameForNull() {
+    try {
+      val dir: File = null
+      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      fail("KafkaException should have been thrown for dir: " + dir)
+    } catch {
+      case e: Exception => // its GOOD!
+    }
+  }
+
+  @Test
+  def testParseTopicPartitionNameForMissingSeparator() {
+    val topic: String = "test_topic"
+    val partition:String = "1999"
+    val dir: File = new File(logDir + File.separator + topic + partition)
+    try {
+      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
+    } catch {
+      case e: Exception => // its GOOD!
+    }
+  }
+
+  @Test
+  def testParseTopicPartitionNameForMissingTopic() {
+    val topic: String = ""
+    val partition:String = "1999"
+    val dir: File = new File(logDir + topicPartitionName(topic, partition))
+    try {
+      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
+    } catch {
+      case e: Exception => // its GOOD!
+    }
+  }
+
+  @Test
+  def testParseTopicPartitionNameForMissingPartition() {
+    val topic: String = "test_topic"
+    val partition:String = ""
+    val dir: File = new File(logDir + topicPartitionName(topic, partition))
+    try {
+      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
+    } catch {
+      case e: Exception => // its GOOD!
+    }
+  }
+
+  def topicPartitionName(topic: String, partition: String): String = {
+    File.separator + topic + "-" + partition
+  }
 }


Mime
View raw message