From commits-return-9166-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Tue Mar 13 05:21:51 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CAE32177AE for ; Tue, 13 Mar 2018 05:21:51 +0000 (UTC) Received: (qmail 95008 invoked by uid 500); 13 Mar 2018 05:21:51 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 94873 invoked by uid 500); 13 Mar 2018 05:21:51 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 94863 invoked by uid 99); 13 Mar 2018 05:21:51 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Mar 2018 05:21:51 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D39C580978; Tue, 13 Mar 2018 05:21:49 +0000 (UTC) Date: Tue, 13 Mar 2018 05:21:49 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 1.1 updated: KAFKA-6624; Prevent concurrent log flush and log deletion (#4663) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152091850938.7188.2507516921263055091@gitbox.apache.org> From: junrao@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/1.1 X-Git-Reftype: branch X-Git-Oldrev: e334e25fad54dc1f523322e28b85d229adeb7d29 X-Git-Newrev: 91a14d3c23bbc3da6f33c9f0bdfda37d0864e726 X-Git-Rev: 91a14d3c23bbc3da6f33c9f0bdfda37d0864e726 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 91a14d3 KAFKA-6624; Prevent concurrent log flush and log deletion (#4663) 91a14d3 is described below commit 91a14d3c23bbc3da6f33c9f0bdfda37d0864e726 Author: Dong Lin AuthorDate: Mon Mar 12 22:20:44 2018 -0700 KAFKA-6624; Prevent concurrent log flush and log deletion (#4663) KAFKA-6624; Prevent concurrent log flush and log deletion Reviewers: Ted Yu , Jun Rao --- core/src/main/scala/kafka/log/LogManager.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 9ae93aa..7aa5bcd 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -75,7 +75,8 @@ class LogManager(logDirs: Seq[File], // from one log directory to another log directory on the same broker. The directory of the future log will be renamed // to replace the current log of the partition after the future log catches up with the current log private val futureLogs = new Pool[TopicPartition, Log]() - private val logsToBeDeleted = new LinkedBlockingQueue[Log]() + // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion. + private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]() private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) @volatile var currentDefaultConfig = initialDefaultConfig @@ -240,6 +241,10 @@ class LogManager(logDirs: Seq[File], } } + private def addLogToBeDeleted(log: Log): Unit = { + this.logsToBeDeleted.add((log, time.milliseconds())) + } + private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { debug("Loading log '" + logDir.getName + "'") val topicPartition = Log.parseTopicPartitionName(logDir) @@ -260,7 +265,7 @@ class LogManager(logDirs: Seq[File], logDirFailureChannel = logDirFailureChannel) if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { - this.logsToBeDeleted.add(log) + addLogToBeDeleted(log) } else { val previous = { if (log.isFuture) @@ -704,9 +709,12 @@ class LogManager(logDirs: Seq[File], private def deleteLogs(): Unit = { try { while (!logsToBeDeleted.isEmpty) { - val removedLog = logsToBeDeleted.take() + val (removedLog, scheduleTimeMs) = logsToBeDeleted.take() if (removedLog != null) { try { + val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() + if (waitingTimeMs > 0) + Thread.sleep(waitingTimeMs) removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") } catch { @@ -767,7 +775,7 @@ class LogManager(logDirs: Seq[File], sourceLog.close() checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) - logsToBeDeleted.add(sourceLog) + addLogToBeDeleted(sourceLog) } catch { case e: KafkaStorageException => // If sourceLog's log directory is offline, we need close its handlers here. @@ -805,7 +813,7 @@ class LogManager(logDirs: Seq[File], removedLog.renameDir(Log.logDeleteDirName(topicPartition)) checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile) checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) - logsToBeDeleted.add(removedLog) + addLogToBeDeleted(removedLog) info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") } else if (offlineLogDirs.nonEmpty) { throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(",")) -- To stop receiving notification emails like this one, please contact junrao@apache.org.