kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Fix open file leak in log cleaner integration tests
Date Thu, 20 Apr 2017 12:19:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 63f01a8af -> 6af876b94


MINOR: Fix open file leak in log cleaner integration tests

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2870 from hachikuji/fix-log-cleaner-test-leak


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6af876b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6af876b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6af876b9

Branch: refs/heads/trunk
Commit: 6af876b94dc588c8d6098a7fef7a18c46625d1d5
Parents: 63f01a8
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Apr 20 11:07:26 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Apr 20 13:19:36 2017 +0100

----------------------------------------------------------------------
 .../log/AbstractLogCleanerIntegrationTest.scala | 113 +++++++++++++++++++
 .../kafka/log/LogCleanerIntegrationTest.scala   |  81 +++----------
 .../log/LogCleanerLagIntegrationTest.scala      |  65 ++---------
 .../unit/kafka/log/ProducerIdMappingTest.scala  |   5 +-
 .../scala/unit/kafka/utils/MockScheduler.scala  |   2 +-
 5 files changed, 137 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6af876b9/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
new file mode 100644
index 0000000..3c9ddd1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.File
+import java.nio.file.Files
+import java.util.Properties
+
+import kafka.utils.{MockTime, Pool, TestUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.After
+
+import scala.collection.mutable.ListBuffer
+
+abstract class AbstractLogCleanerIntegrationTest {
+
+  var cleaner: LogCleaner = _
+  val logDir = TestUtils.tempDir()
+
+  private val logs = ListBuffer.empty[Log]
+  private val defaultMaxMessageSize = 128
+  private val defaultMinCleanableDirtyRatio = 0.0F
+  private val defaultCompactionLag = 0L
+  private val defaultDeleteDelay = 1000
+  private val defaultSegmentSize = 256
+
+  def time: MockTime
+
+  @After
+  def teardown(): Unit = {
+    if (cleaner != null)
+      cleaner.shutdown()
+    time.scheduler.shutdown()
+    logs.foreach(_.close())
+    Utils.delete(logDir)
+  }
+
+  def logConfigProperties(propertyOverrides: Properties = new Properties(),
+                          maxMessageSize: Int,
+                          minCleanableDirtyRatio: Float = defaultMinCleanableDirtyRatio,
+                          compactionLag: Long = defaultCompactionLag,
+                          deleteDelay: Int = defaultDeleteDelay,
+                          segmentSize: Int = defaultSegmentSize): Properties = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
+    props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    props.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
+    props.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
+    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+    props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
+    props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
+    props.putAll(propertyOverrides)
+    props
+  }
+
+  def makeCleaner(partitions: Iterable[TopicPartition],
+                  minCleanableDirtyRatio: Float = defaultMinCleanableDirtyRatio,
+                  numThreads: Int = 1,
+                  backOffMs: Long = 15000L,
+                  maxMessageSize: Int = defaultMaxMessageSize,
+                  compactionLag: Long = defaultCompactionLag,
+                  deleteDelay: Int = defaultDeleteDelay,
+                  segmentSize: Int = defaultSegmentSize,
+                  propertyOverrides: Properties = new Properties()): LogCleaner = {
+
+    val logMap = new Pool[TopicPartition, Log]()
+    for (partition <- partitions) {
+      val dir = new File(logDir, s"${partition.topic}-${partition.partition}")
+      Files.createDirectories(dir.toPath)
+
+      val logConfig = LogConfig(logConfigProperties(propertyOverrides,
+        maxMessageSize = maxMessageSize,
+        minCleanableDirtyRatio = minCleanableDirtyRatio,
+        compactionLag = compactionLag,
+        deleteDelay = deleteDelay,
+        segmentSize = segmentSize))
+      val log = new Log(dir,
+        logConfig,
+        logStartOffset = 0L,
+        recoveryPoint = 0L,
+        scheduler = time.scheduler,
+        time = time)
+      logMap.put(partition, log)
+      this.logs += log
+    }
+
+    val cleanerConfig = CleanerConfig(
+      numThreads = numThreads,
+      ioBufferSize = maxMessageSize / 2,
+      maxMessageSize = maxMessageSize,
+      backOffMs = backOffMs)
+    new LogCleaner(cleanerConfig,
+      logDirs = Array(logDir),
+      logs = logMap,
+      time = time)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6af876b9/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 7ef7559..19a97bc 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -26,7 +26,6 @@ import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
@@ -41,17 +40,12 @@ import scala.util.Random
  * This is an integration test that tests the fully integrated log cleaner
  */
 @RunWith(value = classOf[Parameterized])
-class LogCleanerIntegrationTest(compressionCodec: String) {
+class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCleanerIntegrationTest
{
 
   val codec = CompressionType.forName(compressionCodec)
   val time = new MockTime()
-  val segmentSize = 256
-  val deleteDelay = 1000
-  val logName = "log"
-  val logDir = TestUtils.tempDir()
   var counter = 0
-  var cleaner: LogCleaner = _
-  val topics = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log",
2))
+  val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1),
new TopicPartition("log", 2))
 
   @Test
   def cleanerTest() {
@@ -59,8 +53,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey,
RecordBatch.CURRENT_MAGIC_VALUE)
     val maxMessageSize = largeMessageSet.sizeInBytes
 
-    cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
-    val log = cleaner.logs.get(topics(0))
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
+    val log = cleaner.logs.get(topicPartitions(0))
 
     val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
     val startSize = log.size
@@ -86,11 +80,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     // simulate deleting a partition, by removing it from logs
     // force a checkpoint
     // and make sure its gone from checkpoint file
-    cleaner.logs.remove(topics(0))
+    cleaner.logs.remove(topicPartitions(0))
     cleaner.updateCheckpoints(logDir)
-    val checkpoints = new OffsetCheckpointFile(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
+    val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read()
     // we expect partition 0 to be gone
-    assertFalse(checkpoints.contains(topics(0)))
+    assertFalse(checkpoints.contains(topicPartitions(0)))
   }
 
   @Test
@@ -101,8 +95,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
 
     def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = {
-      cleaner = makeCleaner(parts = 1, propertyOverrides = logProps, logCleanerBackOffMillis
= 100L)
-      val log = cleaner.logs.get(topics(0))
+      cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps,
backOffMs = 100L)
+      val log = cleaner.logs.get(topicPartitions(0))
 
       val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
       val startSize = log.size
@@ -158,9 +152,9 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
         largeMessageSet.sizeInBytes + 5
     }
 
-    cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
 
-    val log = cleaner.logs.get(topics(0))
+    val log = cleaner.logs.get(topicPartitions(0))
     val props = logConfigProperties(maxMessageSize = maxMessageSize)
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
     log.config = new LogConfig(props)
@@ -197,9 +191,9 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   @Test
   def testCleaningNestedMessagesWithMultipleVersions(): Unit = {
     val maxMessageSize = 192
-    cleaner = makeCleaner(parts = 3, maxMessageSize = maxMessageSize)
+    cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
 
-    val log = cleaner.logs.get(topics(0))
+    val log = cleaner.logs.get(topicPartitions(0))
     val props = logConfigProperties(maxMessageSize = maxMessageSize)
     props.put(LogConfig.MessageFormatVersionProp, KAFKA_0_9_0.version)
     log.config = new LogConfig(props)
@@ -289,55 +283,6 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
   }
 
-  @After
-  def tearDown(): Unit = {
-    cleaner.shutdown()
-    time.scheduler.shutdown()
-    Utils.delete(logDir)
-  }
-
-  private def logConfigProperties(propertyOverrides: Properties = new Properties(), maxMessageSize:
Int, minCleanableDirtyRatio: Float = 0.0F): Properties = {
-    val props = new Properties()
-    props.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
-    props.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-    props.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
-    props.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
-    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-    props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
-    props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
-    props.putAll(propertyOverrides)
-    props
-  }
-  
-  /* create a cleaner instance and logs with the given parameters */
-  private def makeCleaner(parts: Int,
-                          minCleanableDirtyRatio: Float = 0.0F,
-                          numThreads: Int = 1,
-                          maxMessageSize: Int = 128,
-                          logCleanerBackOffMillis: Long = 15000L,
-                          propertyOverrides: Properties = new Properties()): LogCleaner =
{
-    
-    // create partitions and add them to the pool
-    val logs = new Pool[TopicPartition, Log]()
-    for(i <- 0 until parts) {
-      val dir = new File(logDir, "log-" + i)
-      dir.mkdirs()
-
-      val log = new Log(dir,
-                        LogConfig(logConfigProperties(propertyOverrides, maxMessageSize,
minCleanableDirtyRatio)),
-                        logStartOffset = 0L,
-                        recoveryPoint = 0L,
-                        scheduler = time.scheduler,
-                        time = time)
-      logs.put(new TopicPartition("log", i), log)
-    }
-  
-    new LogCleaner(CleanerConfig(numThreads = numThreads, ioBufferSize = maxMessageSize /
2, maxMessageSize = maxMessageSize, backOffMs = logCleanerBackOffMillis),
-                   logDirs = Array(logDir),
-                   logs = logs,
-                   time = time)
-  }
-
 }
 
 object LogCleanerIntegrationTest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6af876b9/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 6b9f4ea..eb3f50c 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -17,13 +17,9 @@
 
 package kafka.log
 
-import java.io.File
-import java.util.Properties
-
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
@@ -36,7 +32,7 @@ import scala.collection._
   * This is an integration test that tests the fully integrated log cleaner
   */
 @RunWith(value = classOf[Parameterized])
-class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging {
+class LogCleanerLagIntegrationTest(compressionCodecName: String) extends AbstractLogCleanerIntegrationTest
with Logging {
   val msPerHour = 60 * 60 * 1000
 
   val compactionLag = 1 * msPerHour
@@ -44,18 +40,18 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends
Logging
 
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val cleanerBackOffMs = 200L
-  val segmentSize = 100
-  val deleteDelay = 1000
-  val logName = "log"
-  val logDir = TestUtils.tempDir()
+  val segmentSize = 512
   var counter = 0
-  val topics = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log",
2))
+  val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1),
new TopicPartition("log", 2))
   val compressionCodec = CompressionType.forName(compressionCodecName)
 
   @Test
   def cleanerTest(): Unit = {
-    val cleaner = makeCleaner(parts = 3, backOffMs = cleanerBackOffMs)
-    val log = cleaner.logs.get(topics(0))
+    cleaner = makeCleaner(partitions = topicPartitions,
+      backOffMs = cleanerBackOffMs,
+      compactionLag = compactionLag,
+      segmentSize = segmentSize)
+    val log = cleaner.logs.get(topicPartitions(0))
 
     // t = T0
     val T0 = time.milliseconds
@@ -98,9 +94,6 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends
Logging
     assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset,
but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset)
     assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0
compacted size=$compactedSize",
       sizeUpToActiveSegmentAtT0 > compactedSize)
-
-    cleaner.logs.remove(topics(0))
-    cleaner.shutdown()
   }
 
   private def readFromLog(log: Log): Iterable[(Int, Int)] = {
@@ -122,48 +115,6 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends
Logging
     }
   }
 
-  @After
-  def teardown(): Unit = {
-    time.scheduler.shutdown()
-    Utils.delete(logDir)
-  }
-
-  /* create a cleaner instance and logs with the given parameters */
-  private def makeCleaner(parts: Int,
-                  minCleanableDirtyRatio: Float = 0.0F,
-                  numThreads: Int = 1,
-                  backOffMs: Long = 200L,
-                  defaultPolicy: String = "compact",
-                  policyOverrides: Map[String, String] = Map()): LogCleaner = {
-
-    // create partitions and add them to the pool
-    val logs = new Pool[TopicPartition, Log]()
-    for(i <- 0 until parts) {
-      val dir = new File(logDir, "log-" + i)
-      dir.mkdirs()
-      val logProps = new Properties()
-      logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-      logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
-      logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
-      logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
-      logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-      logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
-
-      val log = new Log(dir,
-        LogConfig(logProps),
-        logStartOffset = 0L,
-        recoveryPoint = 0L,
-        scheduler = time.scheduler,
-        time = time)
-      logs.put(new TopicPartition("log", i), log)
-    }
-
-    new LogCleaner(CleanerConfig(numThreads = numThreads, backOffMs = backOffMs),
-      logDirs = Array(logDir),
-      logs = logs,
-      time = time)
-  }
-
 }
 
 object LogCleanerLagIntegrationTest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6af876b9/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
index d27f0ca..3b78921 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerIdMappingTest.scala
@@ -23,7 +23,7 @@ import java.util.Properties
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException,
ProducerFencedException}
-import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.utils.{MockTime, Utils}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -52,8 +52,7 @@ class ProducerIdMappingTest extends JUnitSuite {
 
   @After
   def tearDown(): Unit = {
-    idMappingDir.listFiles().foreach(f => f.delete())
-    idMappingDir.deleteOnExit()
+    Utils.delete(idMappingDir)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/6af876b9/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 98ad644..c5f383c 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -71,7 +71,7 @@ class MockScheduler(val time: Time) extends Scheduler {
     }
   }
   
-  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit:
TimeUnit = TimeUnit.MILLISECONDS) {
+  def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit:
TimeUnit = TimeUnit.MILLISECONDS) {
     this synchronized {
       tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
       tick()


Mime
View raw message