kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: KAFKA-5663; Pass non-null logDirFailureChannel to Log.apply
Date Wed, 02 Aug 2017 22:54:09 GMT
KAFKA-5663; Pass non-null logDirFailureChannel to Log.apply

Also:
- Improve logging
- Remove dangerous default arguments in Log.apply
- Improve naming of methods and fields in LogDirFailureChannel
- Some clean-ups

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>, Apurva Mehta <apurva@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #3594 from lindong28/KAFKA-5663


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb21209b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb21209b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb21209b

Branch: refs/heads/trunk
Commit: fb21209b5ad30001eeace56b3c8ab060e0ceb021
Parents: 125d69c
Author: Dong Lin <lindong28@gmail.com>
Authored: Wed Aug 2 16:05:26 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Aug 2 23:53:56 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |  12 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |   4 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   3 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  43 +-
 .../kafka/server/LogDirFailureChannel.scala     |  28 +-
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../server/checkpoints/CheckpointFile.scala     |  10 +-
 .../kafka/api/LogDirFailureTest.scala           |  34 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   7 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |  27 +-
 .../scala/unit/kafka/cluster/ReplicaTest.scala  |  11 +-
 .../log/AbstractLogCleanerIntegrationTest.scala |   5 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   6 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  11 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |   6 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 794 +++++++------------
 16 files changed, 441 insertions(+), 562 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0610e87..60ec7a0 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1600,7 +1600,7 @@ class Log(@volatile var dir: File,
       fun
     } catch {
       case e: IOException =>
-        logDirFailureChannel.maybeAddLogFailureEvent(dir.getParent)
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e)
         throw new KafkaStorageException(msg, e)
     }
   }
@@ -1649,14 +1649,14 @@ object Log {
 
   def apply(dir: File,
             config: LogConfig,
-            logStartOffset: Long = 0L,
-            recoveryPoint: Long = 0L,
+            logStartOffset: Long,
+            recoveryPoint: Long,
             scheduler: Scheduler,
             brokerTopicStats: BrokerTopicStats,
             time: Time = Time.SYSTEM,
-            maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-            producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
-            logDirFailureChannel: LogDirFailureChannel = null): Log = {
+            maxProducerIdExpirationMs: Int,
+            producerIdExpirationCheckIntervalMs: Int,
+            logDirFailureChannel: LogDirFailureChannel): Log = {
     val topicPartition = Log.parseTopicPartitionName(dir)
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time,
maxProducerIdExpirationMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 27da43b..85d6487 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -265,8 +265,8 @@ class LogCleaner(val config: CleanerConfig,
           } catch {
             case _: LogCleaningAbortedException => // task can be aborted, let it go.
             case e: IOException =>
-              error(s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent}
due to IOException", e)
-              logDirFailureChannel.maybeAddLogFailureEvent(cleanable.log.dir.getParent)
+              val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent}
due to IOException"
+              logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg,
e)
           } finally {
             cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile,
endOffset)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index ad50aab..8f82e65 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -47,6 +47,7 @@ object Defaults {
   val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
   val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
   val Compact = kafka.server.Defaults.LogCleanupPolicy
+  val CleanupPolicy = kafka.server.Defaults.LogCleanupPolicy
   val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
   val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
   val CompressionType = kafka.server.Defaults.CompressionType
@@ -235,7 +236,7 @@ object LogConfig {
         KafkaConfig.LogDeleteDelayMsProp)
       .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0,
1), MEDIUM,
         MinCleanableRatioDoc, KafkaConfig.LogCleanerMinCleanRatioProp)
-      .define(CleanupPolicyProp, LIST, Defaults.Compact, ValidList.in(LogConfig.Compact,
LogConfig.Delete), MEDIUM, CompactDoc,
+      .define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy, ValidList.in(LogConfig.Compact,
LogConfig.Delete), MEDIUM, CompactDoc,
         KafkaConfig.LogCleanupPolicyProp)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
         MEDIUM, UncleanLeaderElectionEnableDoc, KafkaConfig.UncleanLeaderElectionEnableProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index f459cc1..88a0e21 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -194,8 +194,7 @@ class LogManager(logDirs: Array[File],
         Some(lock)
       } catch {
         case e: IOException =>
-          error(s"Disk error while locking directory $dir", e)
-          logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
+          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while
locking directory $dir", e)
           None
       }
     }
@@ -214,9 +213,11 @@ class LogManager(logDirs: Array[File],
       logStartOffset = logStartOffset,
       recoveryPoint = logRecoveryPoint,
       maxProducerIdExpirationMs = maxPidExpirationMs,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       scheduler = scheduler,
       time = time,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      logDirFailureChannel = logDirFailureChannel)
 
     if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
       this.logsToBeDeleted.add(current)
@@ -237,7 +238,7 @@ class LogManager(logDirs: Array[File],
     info("Loading logs.")
     val startMs = time.milliseconds
     val threadPools = ArrayBuffer.empty[ExecutorService]
-    val offlineDirs = ArrayBuffer.empty[String]
+    val offlineDirs = ArrayBuffer.empty[(String, IOException)]
     val jobs = mutable.Map.empty[File, Seq[Future[_]]]
 
     for (dir <- liveLogDirs) {
@@ -283,7 +284,7 @@ class LogManager(logDirs: Array[File],
               loadLogs(logDir, recoveryPoints, logStartOffsets)
             } catch {
               case e: IOException =>
-                offlineDirs.append(dir.getAbsolutePath)
+                offlineDirs.append((dir.getAbsolutePath, e))
                 error("Error while loading log dir " + dir.getAbsolutePath, e)
             }
           }
@@ -291,7 +292,7 @@ class LogManager(logDirs: Array[File],
         jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
       } catch {
         case e: IOException =>
-          offlineDirs.append(dir.getAbsolutePath)
+          offlineDirs.append((dir.getAbsolutePath, e))
           error("Error while loading log dir " + dir.getAbsolutePath, e)
       }
     }
@@ -303,11 +304,13 @@ class LogManager(logDirs: Array[File],
           cleanShutdownFile.delete()
         } catch {
           case e: IOException =>
-            offlineDirs.append(cleanShutdownFile.getParent)
+            offlineDirs.append((cleanShutdownFile.getParent, e))
             error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
         }
       }
-      offlineDirs.foreach(logDirFailureChannel.maybeAddLogFailureEvent)
+      offlineDirs.foreach { case (dir, e) =>
+        logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean
shutdown file in dir $dir", e)
+      }
     } catch {
       case e: ExecutionException => {
         error("There was an error in one of the threads during logs loading: " + e.getCause)
@@ -500,8 +503,7 @@ class LogManager(logDirs: Array[File],
         this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint)))
       } catch {
         case e: IOException =>
-          error(s"Disk error while writing to recovery point file in directory $dir", e)
-          logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
+          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while
writing to recovery point file in directory $dir", e)
       }
     }
   }
@@ -518,8 +520,7 @@ class LogManager(logDirs: Array[File],
         ))
       } catch {
         case e: IOException =>
-          error(s"Disk error while writing to logStartOffset file in directory $dir", e)
-          logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
+          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while
writing to logStartOffset file in directory $dir", e)
       }
     }
   }
@@ -555,9 +556,12 @@ class LogManager(logDirs: Array[File],
             logStartOffset = 0L,
             recoveryPoint = 0L,
             maxProducerIdExpirationMs = maxPidExpirationMs,
+            producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
             scheduler = scheduler,
             time = time,
-            brokerTopicStats = brokerTopicStats)
+            brokerTopicStats = brokerTopicStats,
+            logDirFailureChannel = logDirFailureChannel)
+
           logs.put(topicPartition, log)
 
           info("Created log for partition [%s,%d] in %s with properties {%s}."
@@ -568,8 +572,9 @@ class LogManager(logDirs: Array[File],
           log
         } catch {
           case e: IOException =>
-            logDirFailureChannel.maybeAddLogFailureEvent(dataDir.getAbsolutePath)
-            throw new KafkaStorageException(s"Error while creating log for $topicPartition
in dir ${dataDir.getAbsolutePath}", e)
+            val msg = s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}"
+            logDirFailureChannel.maybeAddOfflineLogDir(dataDir.getAbsolutePath, msg, e)
+            throw new KafkaStorageException(msg, e)
         }
       }
     }
@@ -635,8 +640,9 @@ class LogManager(logDirs: Array[File],
         }
       } catch {
         case e: IOException =>
-          logDirFailureChannel.maybeAddLogFailureEvent(removedLog.dir.getParent)
-          throw new KafkaStorageException(s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}.",
e)
+          val msg = s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}."
+          logDirFailureChannel.maybeAddOfflineLogDir(removedLog.dir.getParent, msg, e)
+          throw new KafkaStorageException(msg, e)
       }
     } else if (offlineLogDirs.nonEmpty) {
       throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because
it may be in one of the offline directories " + offlineLogDirs.mkString(","))
@@ -727,6 +733,9 @@ class LogManager(logDirs: Array[File],
 }
 
 object LogManager {
+
+  val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
+
   def apply(config: KafkaConfig,
             initialOfflineDirs: Seq[String],
             zkUtils: ZkUtils,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
index 23d9986..c78f04e 100644
--- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
+++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
@@ -18,29 +18,35 @@
 
 package kafka.server
 
+import java.io.IOException
 import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
 
+import kafka.utils.Logging
+
 /*
- * LogDirFailureChannel allows an external thread to block waiting for new offline log dir.
+ * LogDirFailureChannel allows an external thread to block waiting for new offline log dirs.
  *
- * LogDirFailureChannel should be a singleton object which can be accessed by any class that
does disk-IO operation.
- * If IOException is encountered while accessing a log directory, the corresponding class
can insert the the log directory name
- * to the LogDirFailureChannel using maybeAddLogFailureEvent(). Then a thread which is blocked
waiting for new offline log directories
- * can take the name of the new offline log directory out of the LogDirFailureChannel and
handles the log failure properly.
+ * There should be a single instance of LogDirFailureChannel accessible by any class that
does disk-IO operation.
+ * If IOException is encountered while accessing a log directory, the corresponding class
can add the log directory name
+ * to the LogDirFailureChannel using maybeAddOfflineLogDir(). Each log directory will be
added only once. After a log
+ * directory is added for the first time, a thread which is blocked waiting for new offline
log directories
+ * can take the name of the new offline log directory out of the LogDirFailureChannel and
handle the log failure properly.
+ * An offline log directory will stay offline until the broker is restarted.
  *
  */
-class LogDirFailureChannel(logDirNum: Int) {
+class LogDirFailureChannel(logDirNum: Int) extends Logging {
 
   private val offlineLogDirs = new ConcurrentHashMap[String, String]
-  private val logDirFailureEvent = new ArrayBlockingQueue[String](logDirNum)
+  private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)
 
   /*
    * If the given logDir is not already offline, add it to the
    * set of offline log dirs and enqueue it to the logDirFailureEvent queue
    */
-  def maybeAddLogFailureEvent(logDir: String): Unit = {
+  def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
+    error(msg, e)
     if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) {
-      logDirFailureEvent.add(logDir)
+      offlineLogDirQueue.add(logDir)
     }
   }
 
@@ -48,8 +54,6 @@ class LogDirFailureChannel(logDirNum: Int) {
    * Get the next offline log dir from logDirFailureEvent queue.
    * The method will wait if necessary until a new offline log directory becomes available
    */
-  def takeNextLogFailureEvent(): String = {
-    logDirFailureEvent.take()
-  }
+  def takeNextOfflineLogDir(): String = offlineLogDirQueue.take()
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b66aba0..11e5344 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -189,7 +189,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends
ShutdownableThread(name) {
     override def doWork() {
-      val newOfflineLogDir = logDirFailureChannel.takeNextLogFailureEvent()
+      val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
       if (haltBrokerOnDirFailure) {
         fatal(s"Halting broker because dir $newOfflineLogDir is offline")
         Exit.halt(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index 7b67559..4c1011f 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -72,8 +72,9 @@ class CheckpointFile[T](val file: File,
         Utils.atomicMoveWithFallback(tempPath, path)
       } catch {
         case e: IOException =>
-          logDirFailureChannel.maybeAddLogFailureEvent(logDir)
-          throw new KafkaStorageException(s"Error while writing to checkpoint file ${file.getAbsolutePath}",
e)
+          val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          throw new KafkaStorageException(msg, e)
       }
     }
   }
@@ -119,8 +120,9 @@ class CheckpointFile[T](val file: File,
         }
       } catch {
         case e: IOException =>
-          logDirFailureChannel.maybeAddLogFailureEvent(logDir)
-          throw new KafkaStorageException(s"Error while reading checkpoint file ${file.getAbsolutePath}",
e)
+          val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          throw new KafkaStorageException(msg, e)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
index 04be8fd..6749a57 100644
--- a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
@@ -34,6 +34,9 @@ import org.junit.Assert.assertTrue
   * Test whether clients can producer and consume when there is log directory failure
   */
 class LogDirFailureTest extends IntegrationTestHarness {
+
+  import kafka.api.LogDirFailureTest._
+
   val producerCount: Int = 1
   val consumerCount: Int = 1
   val serverCount: Int = 2
@@ -42,7 +45,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
   this.logDirCount = 2
   this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
   this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
-  this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp,
"100")
+  this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp,
"60000")
+
 
   @Before
   override def setUp() {
@@ -51,8 +55,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
   @Test
-  def testProduceAfterLogDirFailure() {
+  def testIOExceptionDuringLogRoll() {
+    testProduceAfterLogDirFailure(Roll)
+  }
+
+  @Test
+  def testIOExceptionDuringCheckpoint() {
+    testProduceAfterLogDirFailure(Checkpoint)
+  }
 
+  def testProduceAfterLogDirFailure(failureType: LogDirFailureType) {
     val consumer = consumers.head
     subscribeAndWaitForAssignment(topic, consumer)
     val producer = producers.head
@@ -75,6 +87,17 @@ class LogDirFailureTest extends IntegrationTestHarness {
     logDir.createNewFile()
     assertTrue(logDir.isFile)
 
+    if (failureType == Roll) {
+      try {
+        leaderServer.replicaManager.getLog(partition).get.roll()
+        fail("Log rolling should fail with KafkaStorageException")
+      } catch {
+        case e: KafkaStorageException => // This is expected
+      }
+    } else if (failureType == Checkpoint) {
+      leaderServer.replicaManager.checkpointHighWatermarks()
+    }
+
     // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the
topic will be offline
     TestUtils.waitUntilTrue(() => !leaderServer.logManager.liveLogDirs.contains(logDir),
"Expected log directory offline", 3000L)
     assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty)
@@ -123,3 +146,10 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
 }
+
+object LogDirFailureTest {
+  sealed trait LogDirFailureType
+  case object Roll extends LogDirFailureType
+  case object Checkpoint extends LogDirFailureType
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 5355ca2..1710da7 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.util.concurrent.atomic._
 
 import kafka.log._
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
 import org.apache.kafka.common.record.FileRecords
@@ -49,7 +49,10 @@ object StressTestLog {
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time,
-      brokerTopicStats = new BrokerTopicStats)
+      maxProducerIdExpirationMs = 60 * 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      brokerTopicStats = new BrokerTopicStats,
+      logDirFailureChannel = new LogDirFailureChannel(10))
     val writer = new WriterThread(log)
     writer.start()
     val reader = new ReaderThread(log)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index f211c4c..e05f29d 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -25,7 +25,7 @@ import java.util.{Properties, Random}
 import joptsimple._
 import kafka.log._
 import kafka.message._
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -85,9 +85,9 @@ object TestLinearWriteSpeed {
    val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.")
    val channelOpt = parser.accepts("channel", "Do writes to file channels.")
    val logOpt = parser.accepts("log", "Do writes to kafka logs.")
-                          
+
     val options = parser.parse(args : _*)
-    
+
     CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt)
 
     var bytesToWrite = options.valueOf(bytesOpt).longValue
@@ -125,14 +125,14 @@ object TestLinearWriteSpeed {
         logProperties.put(LogConfig.FlushMessagesProp, flushInterval: java.lang.Long)
         writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties),
scheduler, messageSet)
       } else {
-        System.err.println("Must specify what to write to with one of --log, --channel, or
--mmap") 
+        System.err.println("Must specify what to write to with one of --log, --channel, or
--mmap")
         Exit.exit(1)
       }
     }
     bytesToWrite = (bytesToWrite / numFiles) * numFiles
-    
+
     println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency"))
-    
+
     val beginTest = System.nanoTime
     var maxLatency = 0L
     var totalLatency = 0L
@@ -170,12 +170,12 @@ object TestLinearWriteSpeed {
     println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec")
     scheduler.shutdown()
   }
-  
+
   trait Writable {
     def write(): Int
     def close()
   }
-  
+
   class MmapWritable(val file: File, size: Long, val content: ByteBuffer) extends Writable
{
     file.deleteOnExit()
     val raf = new RandomAccessFile(file, "rw")
@@ -190,7 +190,7 @@ object TestLinearWriteSpeed {
       raf.close()
     }
   }
-  
+
   class ChannelWritable(val file: File, val content: ByteBuffer) extends Writable {
     file.deleteOnExit()
     val raf = new RandomAccessFile(file, "rw")
@@ -204,10 +204,11 @@ object TestLinearWriteSpeed {
       raf.close()
     }
   }
-  
+
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages:
MemoryRecords) extends Writable {
     Utils.delete(dir)
-    val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM)
+    val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM, 60 *
60 * 1000,
+      LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10))
     def write(): Int = {
       log.appendAsLeader(messages, leaderEpoch = 0)
       messages.sizeInBytes
@@ -217,5 +218,5 @@ object TestLinearWriteSpeed {
       Utils.delete(log.dir)
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 839b9d9..b3d4468 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -18,8 +18,8 @@ package kafka.cluster
 
 import java.util.Properties
 
-import kafka.log.{Log, LogConfig}
-import kafka.server.{BrokerTopicStats, LogOffsetMetadata}
+import kafka.log.{Log, LogConfig, LogManager}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
@@ -49,7 +49,10 @@ class ReplicaTest {
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       brokerTopicStats = brokerTopicStats,
-      time = time)
+      time = time,
+      maxProducerIdExpirationMs = 60 * 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10))
 
     replica = new Replica(brokerId = 0,
       topicPartition = new TopicPartition("foo", 0),
@@ -108,7 +111,7 @@ class ReplicaTest {
       assertTrue(replica.logStartOffset <= hw)
 
       // verify that all segments up to the high watermark have been deleted
-      
+
       log.logSegments.headOption.foreach { segment =>
         assertTrue(segment.baseOffset <= hw)
         assertTrue(segment.baseOffset >= replica.logStartOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index d6f0a56..34baf89 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -97,7 +97,10 @@ abstract class AbstractLogCleanerIntegrationTest {
         recoveryPoint = 0L,
         scheduler = time.scheduler,
         time = time,
-        brokerTopicStats = new BrokerTopicStats)
+        brokerTopicStats = new BrokerTopicStats,
+        maxProducerIdExpirationMs = 60 * 60 * 1000,
+        producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+        logDirFailureChannel = new LogDirFailureChannel(10))
       logMap.put(partition, log)
       this.logs += log
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 9c727c6..1cf393e 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
 import org.apache.kafka.common.utils.Utils
 import java.util.{Collection, Properties}
 
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 
 import scala.collection.JavaConverters._
 
@@ -56,7 +56,9 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression:
Strin
     logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
     val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler
= time.scheduler,
-      time = time, brokerTopicStats = new BrokerTopicStats)
+      time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60
* 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10))
 
     /* append two messages */
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),
0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index f4eabc0..c9f5441 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
@@ -236,13 +236,18 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time,
-      brokerTopicStats = new BrokerTopicStats)
+      brokerTopicStats = new BrokerTopicStats,
+      maxProducerIdExpirationMs = 60 * 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10))
     log
   }
 
   private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
     Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler =
time.scheduler,
-      time = time, brokerTopicStats = new BrokerTopicStats)
+      time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60
* 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10))
 
   private def records(key: Int, value: Int, timestamp: Long) =
     MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes,
value.toString.getBytes))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 689a032..c29ece5 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -23,7 +23,7 @@ import java.nio.file.Paths
 import java.util.Properties
 
 import kafka.common._
-import kafka.server.BrokerTopicStats
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
@@ -1238,7 +1238,9 @@ class LogCleanerTest extends JUnitSuite {
 
   private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long
= 0L) =
     Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler
= time.scheduler,
-      time = time, brokerTopicStats = new BrokerTopicStats)
+      time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60
* 60 * 1000,
+      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10))
 
   private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */  }
 


Mime
View raw message