kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5118: Improve message for Kafka failed startup with non-Kafka data in data.dirs
Date Mon, 15 May 2017 08:42:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 758e3aa7e -> e2d4198c4


KAFKA-5118: Improve message for Kafka failed startup with non-Kafka data in data.dirs

Explicitly throwing clear exceptions when starting up a Kafka with some non-Kafka data in
data.dirs.

Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2907 from amethystic/kafka-5118_improve_msg__for_failed_startup_with_nonKafka_data


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e2d4198c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e2d4198c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e2d4198c

Branch: refs/heads/0.10.2
Commit: e2d4198c4ca980c5a5743af043ed2abd08792f30
Parents: 758e3aa
Author: Xi Hu <huxi_2b@hotmail.com>
Authored: Thu May 4 11:46:06 2017 -0700
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon May 15 09:19:55 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala          |  5 ++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e2d4198c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 417122c..029e1ed 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1129,7 +1129,8 @@ object Log {
 
     def exception(dir: File): KafkaException = {
       new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
-        "'" + dir.getName + "' is not in the form of topic-partition\n" +
+        "'" + dir.getName + "' is not in the form of topic-partition or " +
+        "ongoing-deleting directory(topic-partition.uniqueId-delete)\n" +
         "If a directory does not contain Kafka topic data it should not exist in Kafka's
log " +
         "directory")
     }
@@ -1137,6 +1138,8 @@ object Log {
     val dirName = dir.getName
     if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
       throw exception(dir)
+    if (dirName.endsWith(DeleteDirSuffix) && !dirName.matches("^(\\S+)-(\\S+)\\.(\\S+)"
+ DeleteDirSuffix))
+      throw exception(dir)
 
     val name: String =
       if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.indexOf('.'))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e2d4198c/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9e0deb2..cc46fe6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException,
RecordBatchTooLargeException, RecordTooLargeException}
 import kafka.api.ApiVersion
+import kafka.common.KafkaException
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
@@ -1161,6 +1162,24 @@ class LogTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testParseTopicPartitionNameForExistingInvalidDir() {
+    val dir1 = new File(logDir + "/non_kafka_dir")
+    try {
+      Log.parseTopicPartitionName(dir1)
+      fail("KafkaException should have been thrown for dir: " + dir1.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // should only throw KafkaException
+    }
+    val dir2 = new File(logDir + "/non_kafka_dir-delete")
+    try {
+      Log.parseTopicPartitionName(dir2)
+      fail("KafkaException should have been thrown for dir: " + dir2.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // should only throw KafkaException
+    }
+  }
+
   def topicPartitionName(topic: String, partition: String): String =
     File.separator + topic + "-" + partition
 


Mime
View raw message