kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
Date Wed, 04 Dec 2019 14:13:16 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new a784008  KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
a784008 is described below

commit a78400827651d952d6f4746dc5bd2205e25862de
Author: Vikas Singh <vikas@confluent.io>
AuthorDate: Wed Dec 4 05:52:29 2019 -0800

    KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
    
    KAFKA-8448 fixes problem with similar leak. The Log objects are being
    held in ScheduledExecutor PeriodicProducerExpirationCheck callback. The
    fix in KAFKA-8448 was to change the policy of ScheduledExecutor to
    remove the scheduled task when it gets canceled (by calling
    setRemoveOnCancelPolicy(true)).
    
    This works when a log is closed using close() method. But when a log is
    deleted either when the topic gets deleted or when the rebalancing
    operation moves the replica away from broker, the delete() operation is
    invoked. Log.delete() doesn't close the pending scheduled task and that
    leaks Log instance.
    
    Fix is to close the scheduled task in the Log.delete() method too.
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
---
 core/src/main/scala/kafka/log/Log.scala            |  1 +
 .../main/scala/kafka/utils/KafkaScheduler.scala    |  2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 24 ++++++++++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6ca83f8..23f5e23 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1798,6 +1798,7 @@ class Log(@volatile var dir: File,
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
         removeLogMetrics()
+        producerExpireCheck.cancel(true)
         logSegments.foreach(_.deleteIfExists())
         segments.clear()
         leaderEpochCache.foreach(_.clear())
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index cee4478..f41b8f8 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -130,7 +130,7 @@ class KafkaScheduler(val threads: Int,
   /**
    * Package private for testing.
    */
-  private[utils] def taskRunning(task: ScheduledFuture[_]): Boolean = {
+  private[kafka] def taskRunning(task: ScheduledFuture[_]): Boolean = {
     executor.getQueue().contains(task)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f62205f..0a669c5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -374,6 +374,30 @@ class LogTest {
     }
   }
 
+  /**
+   * Test that "PeriodicProducerExpirationCheck" scheduled task gets canceled after log
+   * is deleted.
+   */
+  @Test
+  def testProducerExpireCheckAfterDelete(): Unit = {
+    val scheduler = new KafkaScheduler(1)
+    try {
+      scheduler.startup()
+      val logConfig = LogTest.createLogConfig()
+      val log = createLog(logDir, logConfig, scheduler = scheduler)
+
+      val producerExpireCheck = log.producerExpireCheck
+      assertTrue("producerExpireCheck isn't as part of scheduled tasks",
+        scheduler.taskRunning(producerExpireCheck))
+
+      log.delete()
+      assertFalse("producerExpireCheck is part of scheduled tasks even after log deletion",
+        scheduler.taskRunning(producerExpireCheck))
+    } finally {
+      scheduler.shutdown();
+    }
+  }
+
   private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String):
Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion
= messageFormatVersion)
     var log = createLog(logDir, logConfig)


Mime
View raw message