kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5118: Improve message for Kafka failed startup with non-Kafka data in data.dirs
Date Thu, 04 May 2017 18:46:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ed59f742b -> a6728f6ee


KAFKA-5118: Improve message for Kafka failed startup with non-Kafka data in data.dirs

Explicitly throwing clear exceptions when starting up a Kafka with some non-Kafka data in
data.dirs.

Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2907 from amethystic/kafka-5118_improve_msg__for_failed_startup_with_nonKafka_data


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a6728f6e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a6728f6e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a6728f6e

Branch: refs/heads/trunk
Commit: a6728f6ee7a87da2859baca95b50e00dd6368431
Parents: ed59f74
Author: Xi Hu <huxi_2b@hotmail.com>
Authored: Thu May 4 11:46:06 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu May 4 11:46:06 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala          |  5 ++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a6728f6e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 3c88dc8..b7f340f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1362,7 +1362,8 @@ object Log {
 
     def exception(dir: File): KafkaException = {
       new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
-        "'" + dir.getName + "' is not in the form of topic-partition\n" +
+        "'" + dir.getName + "' is not in the form of topic-partition or " +
+        "ongoing-deleting directory(topic-partition.uniqueId-delete)\n" +
         "If a directory does not contain Kafka topic data it should not exist in Kafka's
log " +
         "directory")
     }
@@ -1370,6 +1371,8 @@ object Log {
     val dirName = dir.getName
     if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
       throw exception(dir)
+    if (dirName.endsWith(DeleteDirSuffix) && !dirName.matches("^(\\S+)-(\\S+)\\.(\\S+)"
+ DeleteDirSuffix))
+      throw exception(dir)
 
     val name: String =
       if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.indexOf('.'))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a6728f6e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b5e376e..0f82cd3 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -23,6 +23,7 @@ import java.util.Properties
 
 import org.apache.kafka.common.errors._
 import kafka.api.ApiVersion
+import kafka.common.KafkaException
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import kafka.utils._
@@ -1622,6 +1623,24 @@ class LogTest {
     }
   }
 
+  @Test
+  def testParseTopicPartitionNameForExistingInvalidDir() {
+    val dir1 = new File(logDir + "/non_kafka_dir")
+    try {
+      Log.parseTopicPartitionName(dir1)
+      fail("KafkaException should have been thrown for dir: " + dir1.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // should only throw KafkaException
+    }
+    val dir2 = new File(logDir + "/non_kafka_dir-delete")
+    try {
+      Log.parseTopicPartitionName(dir2)
+      fail("KafkaException should have been thrown for dir: " + dir2.getCanonicalPath)
+    } catch {
+      case _: KafkaException => // should only throw KafkaException
+    }
+  }
+
   def topicPartitionName(topic: String, partition: String): String =
     File.separator + topic + "-" + partition
 


Mime
View raw message