kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5759; Allow user to specify relative path as log directory
Date Fri, 01 Sep 2017 16:38:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 51af6fb65 -> bc999989b


KAFKA-5759; Allow user to specify relative path as log directory

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jiangjie Qin<becket.qin@gmail.com>

Closes #3709 from lindong28/KAFKA-5759


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc999989
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc999989
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc999989

Branch: refs/heads/trunk
Commit: bc999989bff39db2f5e7b07968988ed9ccba49b2
Parents: 51af6fb
Author: Dong Lin <lindong28@gmail.com>
Authored: Fri Sep 1 09:38:27 2017 -0700
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Fri Sep 1 09:38:27 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogManager.scala        | 6 ++++--
 core/src/main/scala/kafka/server/KafkaServer.scala    | 6 +++---
 core/src/main/scala/kafka/server/ReplicaManager.scala | 1 +
 core/src/test/scala/unit/kafka/utils/TestUtils.scala  | 6 +++++-
 4 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc999989/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4068001..2377497 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -153,6 +153,7 @@ class LogManager(logDirs: Array[File],
     liveLogDirs
   }
 
+  // dir should be an absolute path
   def handleLogDirFailure(dir: String) {
     info(s"Stopping serving logs in dir $dir")
     logCreationOrDeletionLock synchronized {
@@ -701,6 +702,7 @@ class LogManager(logDirs: Array[File],
     }
   }
 
+  // logDir should be an absolute path
   def isLogDirOnline(logDir: String): Boolean = {
     if (!logDirs.exists(_.getAbsolutePath == logDir))
       throw new RuntimeException(s"Log dir $logDir is not found in the config.")
@@ -758,8 +760,8 @@ object LogManager {
       backOffMs = config.logCleanerBackoffMs,
       enableCleaner = config.logCleanerEnable)
 
-    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
-      initialOfflineDirs = initialOfflineDirs.map(new File(_)).toArray,
+    new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile).toArray,
+      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile).toArray,
       topicConfigs = topicConfigs,
       defaultConfig = defaultLogConfig,
       cleanerConfig = cleanerConfig,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc999989/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1270e2f..ec3abff 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -618,10 +618,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
   private def checkpointBrokerId(brokerId: Int) {
     var logDirsWithoutMetaProps: List[String] = List()
 
-    for (logDir <- logManager.liveLogDirs) {
-      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir.getAbsolutePath).read()
+    for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath))
{
+      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
       if(brokerMetadataOpt.isEmpty)
-        logDirsWithoutMetaProps ++= List(logDir.getAbsolutePath)
+        logDirsWithoutMetaProps ++= List(logDir)
     }
 
     for(logDir <- logDirsWithoutMetaProps) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc999989/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 11e5344..4a415e9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1230,6 +1230,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  // logDir should be an absolute path
   def handleLogDirFailure(dir: String) {
     if (!logManager.isLogDirOnline(dir))
       return

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc999989/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ab840ee..2a08311 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -230,7 +230,11 @@ object TestUtils extends Logging {
     if (nodeId >= 0) props.put(KafkaConfig.BrokerIdProp, nodeId.toString)
     props.put(KafkaConfig.ListenersProp, listeners)
     if (logDirCount > 1) {
-      val logDirs = (1 to logDirCount).toList.map(i => TestUtils.tempDir().getAbsolutePath).mkString(",")
+      val logDirs = (1 to logDirCount).toList.map(i =>
+        // We would like to allow user to specify both relative path and absolute path as
log directory for backward-compatibility reason
+        // We can verify this by using a mixture of relative path and absolute path as log
directories in the test
+        if (i % 2 == 0) TestUtils.tempDir().getAbsolutePath else TestUtils.tempRelativeDir("data")
+      ).mkString(",")
       props.put(KafkaConfig.LogDirsProp, logDirs)
     } else {
       props.put(KafkaConfig.LogDirProp, TestUtils.tempDir().getAbsolutePath)


Mime
View raw message