kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/5] kafka git commit: KAFKA-4763; Handle disk failure for JBOD (KIP-112)
Date Sat, 22 Jul 2017 19:36:04 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4898d11..27da43b 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.File
+import java.io.{File, IOException}
 import java.nio._
 import java.util.Date
 import java.util.concurrent.{CountDownLatch, TimeUnit}
@@ -25,6 +25,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import com.yammer.metrics.core.Gauge
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.LogDirFailureChannel
 import kafka.utils._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
@@ -38,29 +39,29 @@ import scala.collection.JavaConverters._
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy.
  * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
- * 
+ *
  * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a
  * "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section.
  * The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a
  * compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable.
  *
  * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "compact" retention policy
- * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. 
- * 
+ * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.
+ *
  * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of
- * the implementation of the mapping. 
- * 
- * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a 
+ * the implementation of the mapping.
+ *
+ * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a
  * higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log).
- * 
+ *
  * To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when
  * doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning.
- * 
+ *
  * Cleaned segments are swapped into the log as they become available.
- * 
+ *
  * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.
- * 
- * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. 
+ *
+ * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner.
  * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic
  * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed).
  * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.
@@ -88,29 +89,30 @@ import scala.collection.JavaConverters._
 class LogCleaner(val config: CleanerConfig,
                  val logDirs: Array[File],
                  val logs: Pool[TopicPartition, Log],
+                 val logDirFailureChannel: LogDirFailureChannel,
                  time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
-  
+
   /* for managing the state of partitions being cleaned. package-private to allow access in tests */
-  private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
+  private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
 
   /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
-  private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
-                                        checkIntervalMs = 300, 
-                                        throttleDown = true, 
+  private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
+                                        checkIntervalMs = 300,
+                                        throttleDown = true,
                                         "cleaner-io",
                                         "bytes",
                                         time = time)
-  
+
   /* the threads */
   private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
-  
+
   /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
-  newGauge("max-buffer-utilization-percent", 
+  newGauge("max-buffer-utilization-percent",
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt
            })
   /* a metric to track the recopy rate of each thread's last cleaning */
-  newGauge("cleaner-recopy-percent", 
+  newGauge("cleaner-recopy-percent",
            new Gauge[Int] {
              def value: Int = {
                val stats = cleaners.map(_.lastStats)
@@ -123,7 +125,7 @@ class LogCleaner(val config: CleanerConfig,
            new Gauge[Int] {
              def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
            })
-  
+
   /**
    * Start the background cleaning
    */
@@ -131,7 +133,7 @@ class LogCleaner(val config: CleanerConfig,
     info("Starting the log cleaner")
     cleaners.foreach(_.start())
   }
-  
+
   /**
    * Stop the background cleaning
    */
@@ -139,7 +141,7 @@ class LogCleaner(val config: CleanerConfig,
     info("Shutting down the log cleaner.")
     cleaners.foreach(_.shutdown())
   }
-  
+
   /**
    *  Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
    *  the partition is aborted.
@@ -155,6 +157,10 @@ class LogCleaner(val config: CleanerConfig,
     cleanerManager.updateCheckpoints(dataDir, update=None)
   }
 
+  def handleLogDirFailure(dir: String) {
+    cleanerManager.handleLogDirFailure(dir)
+  }
+
   /**
    * Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset
    */
@@ -197,21 +203,21 @@ class LogCleaner(val config: CleanerConfig,
     }
     isCleaned
   }
-  
+
   /**
    * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by
    * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
    */
   private class CleanerThread(threadId: Int)
     extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) {
-    
+
     override val loggerName = classOf[LogCleaner].getName
-    
+
     if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
       warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
 
     val cleaner = new Cleaner(id = threadId,
-                              offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, 
+                              offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
                                                               hashAlgorithm = config.hashAlgorithm),
                               ioBufferSize = config.ioBufferSize / config.numThreads / 2,
                               maxIoBufferSize = config.maxMessageSize,
@@ -219,7 +225,7 @@ class LogCleaner(val config: CleanerConfig,
                               throttler = throttler,
                               time = time,
                               checkDone = checkDone)
-    
+
     @volatile var lastStats: CleanerStats = new CleanerStats()
     private val backOffWaitLatch = new CountDownLatch(1)
 
@@ -241,7 +247,7 @@ class LogCleaner(val config: CleanerConfig,
     	 backOffWaitLatch.countDown()
     	 awaitShutdown()
      }
-     
+
     /**
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
@@ -258,6 +264,9 @@ class LogCleaner(val config: CleanerConfig,
             endOffset = nextDirtyOffset
           } catch {
             case _: LogCleaningAbortedException => // task can be aborted, let it go.
+            case e: IOException =>
+              error(s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException", e)
+              logDirFailureChannel.maybeAddLogFailureEvent(cleanable.log.dir.getParent)
           } finally {
             cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
           }
@@ -275,36 +284,36 @@ class LogCleaner(val config: CleanerConfig,
       if (!cleaned)
         backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
     }
-    
+
     /**
      * Log out statistics on a single run of the cleaner.
      */
     def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
       this.lastStats = stats
       def mb(bytes: Double) = bytes / (1024*1024)
-      val message = 
-        "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + 
-        "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), 
-                                                                                stats.elapsedSecs, 
-                                                                                mb(stats.bytesRead/stats.elapsedSecs)) + 
-        "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead), 
-                                                                                           stats.elapsedIndexSecs, 
-                                                                                           mb(stats.mapBytesRead)/stats.elapsedIndexSecs, 
+      val message =
+        "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
+        "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead),
+                                                                                stats.elapsedSecs,
+                                                                                mb(stats.bytesRead/stats.elapsedSecs)) +
+        "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead),
+                                                                                           stats.elapsedIndexSecs,
+                                                                                           mb(stats.mapBytesRead)/stats.elapsedIndexSecs,
                                                                                            100 * stats.elapsedIndexSecs/stats.elapsedSecs) +
         "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) +
-        "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), 
-                                                                                           stats.elapsedSecs - stats.elapsedIndexSecs, 
-                                                                                           mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + 
+        "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead),
+                                                                                           stats.elapsedSecs - stats.elapsedIndexSecs,
+                                                                                           mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) +
         "\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) +
-        "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) + 
-        "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead), 
+        "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) +
+        "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
                                                                    100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
       info(message)
       if (stats.invalidMessagesRead > 0) {
         warn("\tFound %d invalid messages during compaction.".format(stats.invalidMessagesRead))
       }
     }
-   
+
   }
 }
 
@@ -327,14 +336,14 @@ private[log] class Cleaner(val id: Int,
                            throttler: Throttler,
                            time: Time,
                            checkDone: (TopicPartition) => Unit) extends Logging {
-  
+
   override val loggerName = classOf[LogCleaner].getName
 
   this.logIdent = "Cleaner " + id + ": "
 
   /* buffer used for read i/o */
   private var readBuffer = ByteBuffer.allocate(ioBufferSize)
-  
+
   /* buffer used for write i/o */
   private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
 
@@ -352,7 +361,7 @@ private[log] class Cleaner(val id: Int,
   private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
     // figure out the timestamp below which it is safe to remove delete tombstones
     // this position is defined to be a configurable time beneath the last modified time of the last clean segment
-    val deleteHorizonMs = 
+    val deleteHorizonMs =
       cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
         case None => 0L
         case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index ed0cb69..af8707c 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -24,11 +24,13 @@ import java.util.concurrent.locks.ReentrantLock
 import com.yammer.metrics.core.Gauge
 import kafka.common.LogCleaningAbortedException
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.LogDirFailureChannel
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.CoreUtils._
 import kafka.utils.{Logging, Pool}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.errors.KafkaStorageException
 
 import scala.collection.{immutable, mutable}
 
@@ -45,7 +47,9 @@ private[log] case object LogCleaningPaused extends LogCleaningState
  *  While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
  *  requested to be resumed.
  */
-private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicPartition, Log]) extends Logging with KafkaMetricsGroup {
+private[log] class LogCleanerManager(val logDirs: Array[File],
+                                     val logs: Pool[TopicPartition, Log],
+                                     val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
 
   import LogCleanerManager._
 
@@ -53,19 +57,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
 
   // package-private for testing
   private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
-  
+
   /* the offset checkpoints holding the last cleaned point for each log */
-  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile)))).toMap
+  @volatile private var checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile), logDirFailureChannel))).toMap
 
   /* the set of logs currently being cleaned */
   private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
 
   /* a global lock used to control all access to the in-progress set and the offset checkpoints */
   private val lock = new ReentrantLock
-  
+
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
-  
+
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
   newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
@@ -77,8 +81,20 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   /**
    * @return the position processed for all logs.
    */
-  def allCleanerCheckpoints: Map[TopicPartition, Long] =
-    checkpoints.values.flatMap(_.read()).toMap
+  def allCleanerCheckpoints: Map[TopicPartition, Long] = {
+    inLock(lock) {
+      checkpoints.values.flatMap(checkpoint => {
+        try {
+          checkpoint.read()
+        } catch {
+          case e: KafkaStorageException =>
+            error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e)
+            Map.empty[TopicPartition, Long]
+        }
+      }).toMap
+    }
+  }
+
 
    /**
     * Choose the log to clean next and add it to the in-progress set. We recompute this
@@ -217,8 +233,22 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]) {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
-      val existing = checkpoint.read().filterKeys(logs.keys) ++ update
-      checkpoint.write(existing)
+      if (checkpoint != null) {
+        try {
+          val existing = checkpoint.read().filterKeys(logs.keys) ++ update
+          checkpoint.write(existing)
+        } catch {
+          case e: KafkaStorageException =>
+            error(s"Failed to access checkpoint file ${checkpoint.f.getName} in dir ${checkpoint.f.getParentFile.getAbsolutePath}", e)
+        }
+      }
+    }
+  }
+
+  def handleLogDirFailure(dir: String) {
+    info(s"Stopping cleaning logs in dir $dir")
+    inLock(lock) {
+      checkpoints = checkpoints.filterKeys(_.getAbsolutePath != dir)
     }
   }
 
@@ -226,10 +256,11 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     inLock(lock) {
       if (logs.get(topicPartition).config.compact) {
         val checkpoint = checkpoints(dataDir)
-        val existing = checkpoint.read()
-
-        if (existing.getOrElse(topicPartition, 0L) > offset)
-          checkpoint.write(existing + (topicPartition -> offset))
+        if (checkpoint != null) {
+          val existing = checkpoint.read()
+          if (existing.getOrElse(topicPartition, 0L) > offset)
+            checkpoint.write(existing + (topicPartition -> offset))
+        }
       }
     }
   }
@@ -241,7 +272,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     inLock(lock) {
       inProgress(topicPartition) match {
         case LogCleaningInProgress =>
-          updateCheckpoints(dataDir,Option(topicPartition, endOffset))
+          updateCheckpoints(dataDir, Option(topicPartition, endOffset))
           inProgress.remove(topicPartition)
         case LogCleaningAborted =>
           inProgress.put(topicPartition, LogCleaningPaused)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 2df5241..f459cc1 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -21,29 +21,33 @@ import java.io._
 import java.nio.file.Files
 import java.util.concurrent._
 
+import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminUtils
-import kafka.common.{KafkaException, KafkaStorageException}
+import kafka.common.KafkaException
+import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
-
+import org.apache.kafka.common.errors.KafkaStorageException
 import scala.collection.JavaConverters._
 import scala.collection._
+import scala.collection.mutable.ArrayBuffer
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
  * All read and write operations are delegated to the individual log instances.
- * 
+ *
  * The log manager maintains logs in one or more directories. New logs are created in the data directory
  * with the fewest logs. No attempt is made to move partitions after the fact or balance based on
  * size or I/O rate.
- * 
+ *
  * A background thread handles log retention by periodically truncating excess log segments.
  */
 @threadsafe
-class LogManager(val logDirs: Array[File],
+class LogManager(logDirs: Array[File],
+                 initialOfflineDirs: Array[File],
                  val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
                  val defaultConfig: LogConfig,
                  val cleanerConfig: CleanerConfig,
@@ -56,7 +60,8 @@ class LogManager(val logDirs: Array[File],
                  scheduler: Scheduler,
                  val brokerState: BrokerState,
                  brokerTopicStats: BrokerTopicStats,
-                 time: Time) extends Logging {
+                 logDirFailureChannel: LogDirFailureChannel,
+                 time: Time) extends Logging with KafkaMetricsGroup {
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
   val LockFile = ".lock"
@@ -66,140 +71,243 @@ class LogManager(val logDirs: Array[File],
   private val logs = new Pool[TopicPartition, Log]()
   private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
 
-  createAndValidateLogDirs(logDirs)
-  private val dirLocks = lockLogDirs(logDirs)
-  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile)))).toMap
-  private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile)))).toMap
+  private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
+
+  def liveLogDirs: Array[File] = {
+    if (_liveLogDirs.size() == logDirs.size)
+      logDirs
+    else
+      _liveLogDirs.asScala.toArray
+  }
+
+  private val dirLocks = lockLogDirs(liveLogDirs)
+  @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
+    (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
+  @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
+    (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
+
+  private def offlineLogDirs = logDirs.filterNot(_liveLogDirs.contains)
+
   loadLogs()
 
+
   // public, so we can access this from kafka.admin.DeleteTopicTest
   val cleaner: LogCleaner =
     if(cleanerConfig.enableCleaner)
-      new LogCleaner(cleanerConfig, logDirs, logs, time = time)
+      new LogCleaner(cleanerConfig, liveLogDirs, logs, logDirFailureChannel, time = time)
     else
       null
-  
+
+  val offlineLogDirectoryCount = newGauge(
+    "OfflineLogDirectoryCount",
+    new Gauge[Int] {
+      def value = offlineLogDirs.length
+    }
+  )
+
+  for (dir <- logDirs) {
+    newGauge(
+      "LogDirectoryOffline",
+      new Gauge[Int] {
+        def value = if (_liveLogDirs.contains(dir)) 0 else 1
+      },
+      Map("logDirectory" -> dir.getAbsolutePath)
+    )
+  }
+
   /**
-   * Create and check validity of the given directories, specifically:
+   * Create and check validity of the given directories that are not in the given offline directories, specifically:
    * <ol>
    * <li> Ensure that there are no duplicates in the directory list
    * <li> Create each directory if it doesn't exist
-   * <li> Check that each path is a readable directory 
+   * <li> Check that each path is a readable directory
    * </ol>
    */
-  private def createAndValidateLogDirs(dirs: Seq[File]) {
+  private def createAndValidateLogDirs(dirs: Seq[File], initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File] = {
     if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
-      throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
-    for(dir <- dirs) {
-      if(!dir.exists) {
-        info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
-        val created = dir.mkdirs()
-        if(!created)
-          throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
+      throw new KafkaException("Duplicate log directory found: " + dirs.mkString(", "))
+
+    val liveLogDirs = new ConcurrentLinkedQueue[File]()
+
+    for (dir <- dirs if !initialOfflineDirs.contains(dir)) {
+      try {
+        if (!dir.exists) {
+          info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
+          val created = dir.mkdirs()
+          if (!created)
+            throw new IOException("Failed to create data directory " + dir.getAbsolutePath)
+        }
+        if (!dir.isDirectory || !dir.canRead)
+          throw new IOException(dir.getAbsolutePath + " is not a readable log directory.")
+        liveLogDirs.add(dir)
+      } catch {
+        case e: IOException =>
+          error(s"Failed to create or validate data directory $dir.getAbsolutePath", e)
+      }
+    }
+    if (liveLogDirs.isEmpty) {
+      fatal(s"Shutdown broker because none of the specified log dirs from " + dirs.mkString(", ") + " can be created or validated")
+      Exit.halt(1)
+    }
+
+    liveLogDirs
+  }
+
+  def handleLogDirFailure(dir: String) {
+    info(s"Stopping serving logs in dir $dir")
+    logCreationOrDeletionLock synchronized {
+      _liveLogDirs.remove(new File(dir))
+      if (_liveLogDirs.isEmpty) {
+        fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", ")} have failed")
+        Exit.halt(1)
       }
-      if(!dir.isDirectory || !dir.canRead)
-        throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
+
+      recoveryPointCheckpoints = recoveryPointCheckpoints.filterKeys(file => file.getAbsolutePath != dir)
+      logStartOffsetCheckpoints = logStartOffsetCheckpoints.filterKeys(file => file.getAbsolutePath != dir)
+      if (cleaner != null)
+        cleaner.handleLogDirFailure(dir)
+
+      val offlineTopicPartitions = logs.filter { case (tp, log) => log.dir.getParent == dir}.map { case (tp, log) => tp }
+
+      offlineTopicPartitions.foreach(topicPartition => {
+        val removedLog = logs.remove(topicPartition)
+        if (removedLog != null) {
+          removedLog.closeHandlers()
+          removedLog.removeLogMetrics()
+        }
+      })
+      info(s"Partitions ${offlineTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
+      dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy()))
     }
   }
-  
+
   /**
    * Lock all the given directories
    */
   private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
-    dirs.map { dir =>
-      val lock = new FileLock(new File(dir, LockFile))
-      if(!lock.tryLock())
-        throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + 
-                               ". A Kafka instance in another process or thread is using this directory.")
-      lock
+    dirs.flatMap { dir =>
+      try {
+        val lock = new FileLock(new File(dir, LockFile))
+        if (!lock.tryLock())
+          throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +
+            ". A Kafka instance in another process or thread is using this directory.")
+        Some(lock)
+      } catch {
+        case e: IOException =>
+          error(s"Disk error while locking directory $dir", e)
+          logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
+          None
+      }
+    }
+  }
+
+  private def loadLogs(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
+    debug("Loading log '" + logDir.getName + "'")
+    val topicPartition = Log.parseTopicPartitionName(logDir)
+    val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
+    val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
+    val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
+
+    val current = Log(
+      dir = logDir,
+      config = config,
+      logStartOffset = logStartOffset,
+      recoveryPoint = logRecoveryPoint,
+      maxProducerIdExpirationMs = maxPidExpirationMs,
+      scheduler = scheduler,
+      time = time,
+      brokerTopicStats = brokerTopicStats)
+
+    if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
+      this.logsToBeDeleted.add(current)
+    } else {
+      val previous = this.logs.put(topicPartition, current)
+      if (previous != null) {
+        throw new IllegalArgumentException(
+          "Duplicate log directories found: %s, %s!".format(
+            current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+      }
     }
   }
-  
+
   /**
    * Recover and load all logs in the given data directories
    */
   private def loadLogs(): Unit = {
     info("Loading logs.")
     val startMs = time.milliseconds
-    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
+    val threadPools = ArrayBuffer.empty[ExecutorService]
+    val offlineDirs = ArrayBuffer.empty[String]
     val jobs = mutable.Map.empty[File, Seq[Future[_]]]
 
-    for (dir <- this.logDirs) {
-      val pool = Executors.newFixedThreadPool(ioThreads)
-      threadPools.append(pool)
-
-      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
-
-      if (cleanShutdownFile.exists) {
-        debug(
-          "Found clean shutdown file. " +
-          "Skipping recovery for all logs in data directory: " +
-          dir.getAbsolutePath)
-      } else {
-        // log recovery itself is being performed by `Log` class during initialization
-        brokerState.newState(RecoveringFromUncleanShutdown)
-      }
-
-      var recoveryPoints = Map[TopicPartition, Long]()
-      try {
-        recoveryPoints = this.recoveryPointCheckpoints(dir).read
-      } catch {
-        case e: Exception =>
-          warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e)
-          warn("Resetting the recovery checkpoint to 0")
-      }
-
-      var logStartOffsets = Map[TopicPartition, Long]()
+    for (dir <- liveLogDirs) {
       try {
-        logStartOffsets = this.logStartOffsetCheckpoints(dir).read
-      } catch {
-        case e: Exception =>
-          warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e)
-      }
+        val pool = Executors.newFixedThreadPool(ioThreads)
+        threadPools.append(pool)
+
+        val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
+
+        if (cleanShutdownFile.exists) {
+          debug(
+            "Found clean shutdown file. " +
+              "Skipping recovery for all logs in data directory: " +
+              dir.getAbsolutePath)
+        } else {
+          // log recovery itself is being performed by `Log` class during initialization
+          brokerState.newState(RecoveringFromUncleanShutdown)
+        }
 
-      val jobsForDir = for {
-        dirContent <- Option(dir.listFiles).toList
-        logDir <- dirContent if logDir.isDirectory
-      } yield {
-        CoreUtils.runnable {
-          debug("Loading log '" + logDir.getName + "'")
+        var recoveryPoints = Map[TopicPartition, Long]()
+        try {
+          recoveryPoints = this.recoveryPointCheckpoints(dir).read
+        } catch {
+          case e: Exception =>
+            warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e)
+            warn("Resetting the recovery checkpoint to 0")
+        }
 
-          val topicPartition = Log.parseTopicPartitionName(logDir)
-          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
-          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
-          val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
+        var logStartOffsets = Map[TopicPartition, Long]()
+        try {
+          logStartOffsets = this.logStartOffsetCheckpoints(dir).read
+        } catch {
+          case e: Exception =>
+            warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e)
+        }
 
-          val current = Log(
-            dir = logDir,
-            config = config,
-            logStartOffset = logStartOffset,
-            recoveryPoint = logRecoveryPoint,
-            maxProducerIdExpirationMs = maxPidExpirationMs,
-            scheduler = scheduler,
-            time = time,
-            brokerTopicStats = brokerTopicStats)
-          if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
-            this.logsToBeDeleted.add(current)
-          } else {
-            val previous = this.logs.put(topicPartition, current)
-            if (previous != null) {
-              throw new IllegalArgumentException(
-                "Duplicate log directories found: %s, %s!".format(
-                  current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+        val jobsForDir = for {
+          dirContent <- Option(dir.listFiles).toList
+          logDir <- dirContent if logDir.isDirectory
+        } yield {
+          CoreUtils.runnable {
+            try {
+              loadLogs(logDir, recoveryPoints, logStartOffsets)
+            } catch {
+              case e: IOException =>
+                offlineDirs.append(dir.getAbsolutePath)
+                error("Error while loading log dir " + dir.getAbsolutePath, e)
             }
           }
         }
+        jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
+      } catch {
+        case e: IOException =>
+          offlineDirs.append(dir.getAbsolutePath)
+          error("Error while loading log dir " + dir.getAbsolutePath, e)
       }
-
-      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
     }
 
-
     try {
       for ((cleanShutdownFile, dirJobs) <- jobs) {
         dirJobs.foreach(_.get)
-        cleanShutdownFile.delete()
+        try {
+          cleanShutdownFile.delete()
+        } catch {
+          case e: IOException =>
+            offlineDirs.append(cleanShutdownFile.getParent)
+            error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
+        }
       }
+      offlineDirs.foreach(logDirFailureChannel.maybeAddLogFailureEvent)
     } catch {
       case e: ExecutionException => {
         error("There was an error in one of the threads during logs loading: " + e.getCause)
@@ -231,7 +339,7 @@ class LogManager(val logDirs: Array[File],
                          period = flushCheckMs,
                          TimeUnit.MILLISECONDS)
       scheduler.schedule("kafka-recovery-point-checkpoint",
-                         checkpointRecoveryPointOffsets _,
+                         checkpointLogRecoveryOffsets _,
                          delay = InitialTaskDelayMs,
                          period = flushRecoveryOffsetCheckpointMs,
                          TimeUnit.MILLISECONDS)
@@ -256,7 +364,12 @@ class LogManager(val logDirs: Array[File],
   def shutdown() {
     info("Shutting down.")
 
-    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
+    removeMetric("OfflineLogDirectoryCount")
+    for (dir <- logDirs) {
+      removeMetric("LogDirectoryOffline", Map("logDirectory" -> dir.getAbsolutePath))
+    }
+
+    val threadPools = ArrayBuffer.empty[ExecutorService]
     val jobs = mutable.Map.empty[File, Seq[Future[_]]]
 
     // stop the cleaner first
@@ -265,7 +378,7 @@ class LogManager(val logDirs: Array[File],
     }
 
     // close logs in each dir
-    for (dir <- this.logDirs) {
+    for (dir <- liveLogDirs) {
       debug("Flushing and closing logs at " + dir)
 
       val pool = Executors.newFixedThreadPool(ioThreads)
@@ -337,11 +450,12 @@ class LogManager(val logDirs: Array[File],
         }
       }
     }
-    checkpointRecoveryPointOffsets()
+    checkpointLogRecoveryOffsets()
   }
 
   /**
    *  Delete all data in a partition and start the log at the new offset
+   *
    *  @param newOffset The new offset to start the log with
    */
   def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long) {
@@ -357,15 +471,15 @@ class LogManager(val logDirs: Array[File],
         cleaner.resumeCleaning(topicPartition)
       }
     }
-    checkpointRecoveryPointOffsets()
+    checkpointLogRecoveryOffsets()
   }
 
   /**
-   * Write out the current recovery point for all logs to a text file in the log directory 
+   * Write out the current recovery point for all logs to a text file in the log directory
    * to avoid recovering the whole log on startup.
    */
-  def checkpointRecoveryPointOffsets() {
-    this.logDirs.foreach(checkpointLogRecoveryOffsetsInDir)
+  def checkpointLogRecoveryOffsets() {
+    liveLogDirs.foreach(checkpointLogRecoveryOffsetsInDir)
   }
 
   /**
@@ -373,7 +487,7 @@ class LogManager(val logDirs: Array[File],
    * to avoid exposing data that have been deleted by DeleteRecordsRequest
    */
   def checkpointLogStartOffsets() {
-    this.logDirs.foreach(checkpointLogStartOffsetsInDir)
+    liveLogDirs.foreach(checkpointLogStartOffsetsInDir)
   }
 
   /**
@@ -382,7 +496,13 @@ class LogManager(val logDirs: Array[File],
   private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
     val recoveryPoints = this.logsByDir.get(dir.toString)
     if (recoveryPoints.isDefined) {
-      this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
+      try {
+        this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint)))
+      } catch {
+        case e: IOException =>
+          error(s"Disk error while writing to recovery point file in directory $dir", e)
+          logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
+      }
     }
   }
 
@@ -390,10 +510,17 @@ class LogManager(val logDirs: Array[File],
    * Checkpoint log start offset for all logs in provided directory.
    */
   private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
-    val logs = this.logsByDir.get(dir.toString)
+    val logs = this.logsByDir.get(dir.getAbsolutePath)
     if (logs.isDefined) {
-      this.logStartOffsetCheckpoints(dir).write(
-        logs.get.filter{case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset}.mapValues(_.logStartOffset))
+      try {
+        this.logStartOffsetCheckpoints.get(dir).foreach(_.write(
+          logs.get.filter { case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset }.mapValues(_.logStartOffset)
+        ))
+      } catch {
+        case e: IOException =>
+          error(s"Disk error while writing to logStartOffset file in directory $dir", e)
+          logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
+      }
     }
   }
 
@@ -403,33 +530,47 @@ class LogManager(val logDirs: Array[File],
   def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
 
   /**
-   * Create a log for the given topic and the given partition
    * If the log already exists, just return a copy of the existing log
+   * Otherwise if isNew=true or if there is no offline log directory, create a log for the given topic and the given partition
+   * Otherwise throw KafkaStorageException
+   *
+   * @param isNew Whether the replica should have existed on the broker or not
+   * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker
    */
-  def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
+  def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false): Log = {
     logCreationOrDeletionLock synchronized {
-      // create the log if it has not already been created in another thread
       getLog(topicPartition).getOrElse {
+        // create the log if it has not already been created in another thread
+        if (!isNew && offlineLogDirs.nonEmpty)
+          throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
+
         val dataDir = nextLogDir()
-        val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
-        Files.createDirectories(dir.toPath)
-
-        val log = Log(
-          dir = dir,
-          config = config,
-          logStartOffset = 0L,
-          recoveryPoint = 0L,
-          maxProducerIdExpirationMs = maxPidExpirationMs,
-          scheduler = scheduler,
-          time = time,
-          brokerTopicStats = brokerTopicStats)
-        logs.put(topicPartition, log)
-        info("Created log for partition [%s,%d] in %s with properties {%s}."
-          .format(topicPartition.topic,
-            topicPartition.partition,
-            dataDir.getAbsolutePath,
-            config.originals.asScala.mkString(", ")))
-        log
+        try {
+          val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
+          Files.createDirectories(dir.toPath)
+
+          val log = Log(
+            dir = dir,
+            config = config,
+            logStartOffset = 0L,
+            recoveryPoint = 0L,
+            maxProducerIdExpirationMs = maxPidExpirationMs,
+            scheduler = scheduler,
+            time = time,
+            brokerTopicStats = brokerTopicStats)
+          logs.put(topicPartition, log)
+
+          info("Created log for partition [%s,%d] in %s with properties {%s}."
+            .format(topicPartition.topic,
+              topicPartition.partition,
+              dataDir.getAbsolutePath,
+              config.originals.asScala.mkString(", ")))
+          log
+        } catch {
+          case e: IOException =>
+            logDirFailureChannel.maybeAddLogFailureEvent(dataDir.getAbsolutePath)
+            throw new KafkaStorageException(s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}", e)
+        }
       }
     }
   }
@@ -439,30 +580,27 @@ class LogManager(val logDirs: Array[File],
    */
   private def deleteLogs(): Unit = {
     try {
-      var failed = 0
-      while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
+      while (!logsToBeDeleted.isEmpty) {
         val removedLog = logsToBeDeleted.take()
         if (removedLog != null) {
           try {
             removedLog.delete()
             info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
           } catch {
-            case e: Throwable =>
-              error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
-              failed = failed + 1
-              logsToBeDeleted.put(removedLog)
+            case e: KafkaStorageException =>
+              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
           }
         }
       }
     } catch {
-      case e: Throwable => 
+      case e: Throwable =>
         error(s"Exception in kafka-delete-logs thread.", e)
     }
   }
 
   /**
-    * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and 
-    * add it in the queue for deletion. 
+    * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
+    * add it in the queue for deletion.
     * @param topicPartition TopicPartition that needs to be deleted
     */
   def asyncDelete(topicPartition: TopicPartition) = {
@@ -470,30 +608,38 @@ class LogManager(val logDirs: Array[File],
       logs.remove(topicPartition)
     }
     if (removedLog != null) {
-      //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
-      if (cleaner != null) {
-        cleaner.abortCleaning(topicPartition)
-        cleaner.updateCheckpoints(removedLog.dir.getParentFile)
-      }
-      val dirName = Log.logDeleteDirName(removedLog.name)
-      removedLog.close()
-      val renamedDir = new File(removedLog.dir.getParent, dirName)
-      val renameSuccessful = removedLog.dir.renameTo(renamedDir)
-      if (renameSuccessful) {
-        checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
-        removedLog.dir = renamedDir
-        // change the file pointers for log and index file
-        for (logSegment <- removedLog.logSegments) {
-          logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName))
-          logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
+      try {
+        //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
+        if (cleaner != null) {
+          cleaner.abortCleaning(topicPartition)
+          cleaner.updateCheckpoints(removedLog.dir.getParentFile)
         }
+        val dirName = Log.logDeleteDirName(removedLog.name)
+        removedLog.close()
+        val renamedDir = new File(removedLog.dir.getParent, dirName)
+        val renameSuccessful = removedLog.dir.renameTo(renamedDir)
+        if (renameSuccessful) {
+          checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
+          removedLog.dir = renamedDir
+          // change the file pointers for log and index file
+          for (logSegment <- removedLog.logSegments) {
+            logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName))
+            logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
+          }
 
-        logsToBeDeleted.add(removedLog)
-        removedLog.removeLogMetrics()
-        info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
-      } else {
-        throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
+          logsToBeDeleted.add(removedLog)
+          removedLog.removeLogMetrics()
+          info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
+        } else {
+          throw new IOException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
+        }
+      } catch {
+        case e: IOException =>
+          logDirFailureChannel.maybeAddLogFailureEvent(removedLog.dir.getParent)
+          throw new KafkaStorageException(s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}.", e)
       }
+    } else if (offlineLogDirs.nonEmpty) {
+      throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(","))
     }
   }
 
@@ -503,14 +649,14 @@ class LogManager(val logDirs: Array[File],
    * data directory with the fewest partitions.
    */
   private def nextLogDir(): File = {
-    if(logDirs.size == 1) {
-      logDirs(0)
+    if(_liveLogDirs.size == 1) {
+      _liveLogDirs.peek()
     } else {
       // count the number of logs in each parent directory (including 0 for empty directories
       val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
-      val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
+      val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
       val dirCounts = (zeros ++ logCounts).toBuffer
-    
+
       // choose the directory with the least logs in it
       val leastLoaded = dirCounts.sortBy(_._2).head
       new File(leastLoaded._1)
@@ -552,6 +698,13 @@ class LogManager(val logDirs: Array[File],
     }
   }
 
+  def isLogDirOnline(logDir: String): Boolean = {
+    if (!logDirs.exists(_.getAbsolutePath == logDir))
+      throw new RuntimeException(s"Log dir $logDir is not found in the config.")
+
+    _liveLogDirs.contains(new File(logDir))
+  }
+
   /**
    * Flush any log which has exceeded its flush interval and has unwritten messages.
    */
@@ -575,11 +728,13 @@ class LogManager(val logDirs: Array[File],
 
 object LogManager {
   def apply(config: KafkaConfig,
+            initialOfflineDirs: Seq[String],
             zkUtils: ZkUtils,
             brokerState: BrokerState,
             kafkaScheduler: KafkaScheduler,
             time: Time,
-            brokerTopicStats: BrokerTopicStats): LogManager = {
+            brokerTopicStats: BrokerTopicStats,
+            logDirFailureChannel: LogDirFailureChannel): LogManager = {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
@@ -598,6 +753,7 @@ object LogManager {
       enableCleaner = config.logCleanerEnable)
 
     new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+      initialOfflineDirs = initialOfflineDirs.map(new File(_)).toArray,
       topicConfigs = topicConfigs,
       defaultConfig = defaultLogConfig,
       cleanerConfig = cleanerConfig,
@@ -609,7 +765,8 @@ object LogManager {
       maxPidExpirationMs = config.transactionIdExpirationMs,
       scheduler = kafkaScheduler,
       brokerState = brokerState,
-      time = time,
-      brokerTopicStats = brokerTopicStats)
+      brokerTopicStats = brokerTopicStats,
+      logDirFailureChannel = logDirFailureChannel,
+      time = time)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 3e4c47d..0449a4a 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -20,8 +20,6 @@ import java.io.{File, IOException}
 import java.nio.file.Files
 import java.nio.file.attribute.FileTime
 import java.util.concurrent.TimeUnit
-
-import kafka.common._
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server.epoch.LeaderEpochCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
@@ -383,28 +381,13 @@ class LogSegment(val log: FileRecords,
 
   /**
    * Change the suffix for the index and log file for this log segment
+   * IOException from this method should be handled by the caller
    */
   def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
-
-    def kafkaStorageException(fileType: String, e: IOException) =
-      new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e)
-
-    try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
-    catch {
-      case e: IOException => throw kafkaStorageException("log", e)
-    }
-    try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
-    catch {
-      case e: IOException => throw kafkaStorageException("index", e)
-    }
-    try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
-    catch {
-      case e: IOException => throw kafkaStorageException("timeindex", e)
-    }
-    try txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
-    catch {
-      case e: IOException => throw kafkaStorageException("txnindex", e)
-    }
+    log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
+    index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
+    timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
+    txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
   }
 
   /**
@@ -481,9 +464,17 @@ class LogSegment(val log: FileRecords,
   }
 
   /**
+    * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed
+    */
+  def closeHandlers() {
+    CoreUtils.swallow(index.closeHandler())
+    CoreUtils.swallow(timeIndex.closeHandler())
+    CoreUtils.swallow(log.closeHandlers())
+    CoreUtils.swallow(txnIndex.close())
+  }
+
+  /**
    * Delete this log segment from the filesystem.
-   *
-   * @throws KafkaStorageException if the delete fails.
    */
   def delete() {
     val deletedLog = log.delete()
@@ -491,13 +482,13 @@ class LogSegment(val log: FileRecords,
     val deletedTimeIndex = timeIndex.delete()
     val deletedTxnIndex = txnIndex.delete()
     if (!deletedLog && log.file.exists)
-      throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
+      throw new IOException("Delete of log " + log.file.getName + " failed.")
     if (!deletedIndex && index.file.exists)
-      throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
+      throw new IOException("Delete of index " + index.file.getName + " failed.")
     if (!deletedTimeIndex && timeIndex.file.exists)
-      throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
+      throw new IOException("Delete of time index " + timeIndex.file.getName + " failed.")
     if (!deletedTxnIndex && txnIndex.file.exists)
-      throw new KafkaStorageException("Delete of transaction index " + txnIndex.file.getName + " failed.")
+      throw new IOException("Delete of transaction index " + txnIndex.file.getName + " failed.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 7cc8e8e..ce56a6c 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -638,9 +638,13 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   }
 
   private def listSnapshotFiles: List[File] = {
-    if (logDir.exists && logDir.isDirectory)
-      logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList
-    else
+    if (logDir.exists && logDir.isDirectory) {
+      val files = logDir.listFiles
+      if (files != null)
+        files.filter(f => f.isFile && isSnapshotFile(f.getName)).toList
+      else
+        List.empty[File]
+    } else
       List.empty[File]
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b17d255..9b01043 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,22 +18,19 @@
 package kafka.server
 
 import java.util.concurrent.locks.ReentrantLock
-
 import kafka.cluster.BrokerEndPoint
-import kafka.consumer.PartitionTopicInfo
 import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
+import org.apache.kafka.common.errors.KafkaStorageException
 import kafka.common.{ClientIdAndBroker, KafkaException}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.protocol.Errors
 import AbstractFetcherThread._
-
 import scala.collection.{Map, Set, mutable}
 import scala.collection.JavaConverters._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
-
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
@@ -198,7 +195,10 @@ abstract class AbstractFetcherThread(name: String,
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       // should get fixed in the subsequent fetches
                       logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset  + " error " + ime.getMessage)
-                      updatePartitionsWithError(topicPartition);
+                      updatePartitionsWithError(topicPartition)
+                    case e: KafkaStorageException =>
+                      logger.error(s"Error while processing data for partition $topicPartition", e)
+                      updatePartitionsWithError(topicPartition)
                     case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                         .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 8630026..8ac9864 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -30,7 +30,6 @@ case class BrokerMetadata(brokerId: Int)
   */
 class BrokerMetadataCheckpoint(val file: File) extends Logging {
   private val lock = new Object()
-  Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp files for cleanliness
 
   def write(brokerMetadata: BrokerMetadata) = {
     lock synchronized {
@@ -57,6 +56,8 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
   }
 
   def read(): Option[BrokerMetadata] = {
+    Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp files for cleanliness
+
     lock synchronized {
       try {
         val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index e5b301c..a6a8202 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,9 +20,7 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
-import com.yammer.metrics.core.Meter
 import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.Pool
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.DeleteRecordsResponse
@@ -76,12 +74,16 @@ class DelayedDeleteRecords(delayMs: Long,
       if (status.acksPending) {
         val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
           case Some(partition) =>
-            partition.leaderReplicaIfLocal match {
-              case Some(_) =>
-                val leaderLW = partition.lowWatermarkIfLeader
-                (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
-              case None =>
-                (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+            if (partition eq ReplicaManager.OfflinePartition) {
+              (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+            } else {
+              partition.leaderReplicaIfLocal match {
+                case Some(_) =>
+                  val leaderLW = partition.lowWatermarkIfLeader
+                  (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
+                case None =>
+                  (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+              }
             }
           case None =>
             (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 8a9ce02..e478053 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, KafkaStorageException}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.IsolationLevel
 
@@ -71,6 +71,7 @@ class DelayedFetch(delayMs: Long,
    * Case B: This broker does not know of some partitions it tries to fetch
    * Case C: The fetch offset locates not on the last segment of the log
    * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+   * Case E: The partition is in an offline log directory on this broker
    *
    * Upon completion, should return whatever data is available for each valid partition
    */
@@ -117,6 +118,9 @@ class DelayedFetch(delayMs: Long,
             }
           }
         } catch {
+          case _: KafkaStorageException => // Case E
+            debug("Partition %s is in an offline log directory, satisfy %s immediately".format(topicPartition, fetchMetadata))
+            return forceComplete()
           case _: UnknownTopicOrPartitionException => // Case B
             debug("Broker no longer know of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
             return forceComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 0ff8d34..0d452cc 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -95,7 +95,10 @@ class DelayedProduce(delayMs: Long,
       if (status.acksPending) {
         val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
           case Some(partition) =>
-            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
+            if (partition eq ReplicaManager.OfflinePartition)
+              (false, Errors.KAFKA_STORAGE_ERROR)
+            else
+              partition.checkEnoughReplicasReachOffset(status.requiredOffset)
           case None =>
             // Case A
             (false, Errors.UNKNOWN_TOPIC_OR_PARTITION)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e9299f..1fb8901 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.{ApiVersion, ControlledShutdownRequest, ControlledShutdownResponse, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
-import kafka.common.{KafkaStorageException, OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
@@ -36,7 +36,7 @@ import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
 import kafka.security.SecurityUtils
 import kafka.security.auth._
-import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -54,7 +54,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 
-import scala.collection._
+import scala.collection.{mutable, _}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
@@ -144,40 +144,33 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
 
-    try {
-      def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
-        // for each new leader or follower, call coordinator to handle consumer group migration.
-        // this callback is invoked under the replica state change lock to ensure proper order of
-        // leadership changes
-        updatedLeaders.foreach { partition =>
-          if (partition.topic == GROUP_METADATA_TOPIC_NAME)
-            groupCoordinator.handleGroupImmigration(partition.partitionId)
-          else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
-            txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch)
-        }
-
-        updatedFollowers.foreach { partition =>
-          if (partition.topic == GROUP_METADATA_TOPIC_NAME)
-            groupCoordinator.handleGroupEmigration(partition.partitionId)
-          else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
-            txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch)
-        }
+    def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
+      // for each new leader or follower, call coordinator to handle consumer group migration.
+      // this callback is invoked under the replica state change lock to ensure proper order of
+      // leadership changes
+      updatedLeaders.foreach { partition =>
+        if (partition.topic == GROUP_METADATA_TOPIC_NAME)
+          groupCoordinator.handleGroupImmigration(partition.partitionId)
+        else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
+          txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch)
       }
 
-      if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
-        val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
-        sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse))
-      } else {
-        val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-        sendResponseMaybeThrottle(request, _ =>
-          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
+      updatedFollowers.foreach { partition =>
+        if (partition.topic == GROUP_METADATA_TOPIC_NAME)
+          groupCoordinator.handleGroupEmigration(partition.partitionId)
+        else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
+          txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch)
       }
-    } catch {
-      case e: FatalExitError => throw e
-      case e: KafkaStorageException =>
-        fatal("Disk error during leadership change.", e)
-        Exit.halt(1)
+    }
+
+    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+      val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
+      sendResponseExemptThrottle(RequestChannel.Response(request, leaderAndIsrResponse))
+    } else {
+      val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+      sendResponseMaybeThrottle(request, _ =>
+        new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
     }
   }
 
@@ -681,7 +674,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       } catch {
         // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
         // are typically transient and there is no value in logging the entire stack trace for the same
-        case e @ ( _ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException) =>
+        case e @ (_ : UnknownTopicOrPartitionException |
+                  _ : NotLeaderForPartitionException |
+                  _ : KafkaStorageException) =>
           debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
             correlationId, clientId, topicPartition, e.getMessage))
           (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
@@ -750,6 +745,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           // would have received a clear exception and there is no value in logging the entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
+                    _ : KafkaStorageException |
                     _ : UnsupportedForMessageFormatException) =>
             debug(s"Offset request with correlation id $correlationId from client $clientId on " +
                 s"partition $topicPartition failed due to ${e.getMessage}")
@@ -1527,7 +1523,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           case e: Exception =>
             error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
             val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
-            successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN))
+            successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
             updateErrors(producerId, updatedErrors)
         }
       }
@@ -1865,7 +1861,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 Some(new AclDeletionResult(ApiError.fromThrowable(throwable), aclBinding))
             }
           }.asJava
-          
+
           filterResponseMap.put(i, new AclFilterResponse(deletionResults))
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index cc34e14..fc9e4b8 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -47,7 +47,7 @@ import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
-import scala.collection.{Map, mutable}
+import scala.collection.{Seq, Map, mutable}
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -110,6 +110,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
 
+  var logDirFailureChannel: LogDirFailureChannel = null
   var logManager: LogManager = null
 
   var replicaManager: ReplicaManager = null
@@ -195,7 +196,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         info(s"Cluster ID = $clusterId")
 
         /* generate brokerId */
-        config.brokerId =  getBrokerId
+        val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
+        config.brokerId = brokerId
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
         /* create and configure metrics */
@@ -211,8 +213,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         quotaManagers = QuotaFactory.instantiate(config, metrics, time)
         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 
+        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
+
         /* start log manager */
-        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)
+        logManager = LogManager(config, initialOfflineDirs, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
         logManager.startup()
 
         metadataCache = new MetadataCache(config.brokerId)
@@ -307,7 +311,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
     new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower,
-      brokerTopicStats, metadataCache)
+      brokerTopicStats, metadataCache, logDirFailureChannel)
 
   private def initZk(): ZkUtils = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
@@ -582,7 +586,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
         CoreUtils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
-        
+
         if (socketServer != null)
           CoreUtils.swallow(socketServer.shutdown())
         if (requestHandlerPool != null)
@@ -651,16 +655,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
     * <ol>
     *
-    * @return A brokerId.
+    * The log directories whose meta.properties can not be accessed due to IOException will be returned to the caller
+    *
+    * @return A 2-tuple containing the brokerId and a sequence of offline log directories.
     */
-  private def getBrokerId: Int =  {
+  private def getBrokerIdAndOfflineDirs: (Int, Seq[String]) =  {
     var brokerId = config.brokerId
     val brokerIdSet = mutable.HashSet[Int]()
+    val offlineDirs = mutable.ArrayBuffer.empty[String]
 
     for (logDir <- config.logDirs) {
-      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
-      brokerMetadataOpt.foreach { brokerMetadata =>
-        brokerIdSet.add(brokerMetadata.brokerId)
+      try {
+        val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
+        brokerMetadataOpt.foreach { brokerMetadata =>
+          brokerIdSet.add(brokerMetadata.brokerId)
+        }
+      } catch {
+        case e : IOException =>
+          offlineDirs += logDir
+          error(s"Fail to read ${brokerMetaPropsFile} under log directory ${logDir}", e)
       }
     }
 
@@ -678,16 +691,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
       brokerId = brokerIdSet.last
 
-    brokerId
+
+    (brokerId, offlineDirs)
   }
 
   private def checkpointBrokerId(brokerId: Int) {
     var logDirsWithoutMetaProps: List[String] = List()
 
-    for (logDir <- config.logDirs) {
-      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
+    for (logDir <- logManager.liveLogDirs) {
+      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir.getAbsolutePath).read()
       if(brokerMetadataOpt.isEmpty)
-          logDirsWithoutMetaProps ++= List(logDir)
+        logDirsWithoutMetaProps ++= List(logDir.getAbsolutePath)
     }
 
     for(logDir <- logDirsWithoutMetaProps) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
new file mode 100644
index 0000000..23d9986
--- /dev/null
+++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
@@ -0,0 +1,55 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+
+package kafka.server
+
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
+
+/*
+ * LogDirFailureChannel allows an external thread to block waiting for new offline log dir.
+ *
+ * LogDirFailureChannel should be a singleton object which can be accessed by any class that does disk-IO operation.
+ * If IOException is encountered while accessing a log directory, the corresponding class can insert the the log directory name
+ * to the LogDirFailureChannel using maybeAddLogFailureEvent(). Then a thread which is blocked waiting for new offline log directories
+ * can take the name of the new offline log directory out of the LogDirFailureChannel and handles the log failure properly.
+ *
+ */
+class LogDirFailureChannel(logDirNum: Int) {
+
+  private val offlineLogDirs = new ConcurrentHashMap[String, String]
+  private val logDirFailureEvent = new ArrayBlockingQueue[String](logDirNum)
+
+  /*
+   * If the given logDir is not already offline, add it to the
+   * set of offline log dirs and enqueue it to the logDirFailureEvent queue
+   */
+  def maybeAddLogFailureEvent(logDir: String): Unit = {
+    if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) {
+      logDirFailureEvent.add(logDir)
+    }
+  }
+
+  /*
+   * Get the next offline log dir from logDirFailureEvent queue.
+   * The method will wait if necessary until a new offline log directory becomes available
+   */
+  def takeNextLogFailureEvent(): String = {
+    logDirFailureEvent.take()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 466645b..2c28df7 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
@@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, Updat
  */
 class MetadataCache(brokerId: Int) extends Logging {
   private val stateChangeLogger = KafkaController.stateChangeLogger
-  private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]()
+  private val cache = mutable.Map[String, mutable.Map[Int, MetadataPartitionState]]()
   private var controllerId: Option[Int] = None
   private val aliveBrokers = mutable.Map[Int, Broker]()
   private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]()
@@ -73,12 +73,13 @@ class MetadataCache(brokerId: Int) extends Logging {
 
         val replicas = partitionState.allReplicas
         val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints)
+        val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas, listenerName, errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
             debug(s"Error while fetching metadata for $topicPartition: leader not available")
             new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
-              replicaInfo.asJava, java.util.Collections.emptyList())
+              replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava)
 
           case Some(leader) =>
             val isr = leaderAndIsr.isr
@@ -89,15 +90,15 @@ class MetadataCache(brokerId: Int) extends Logging {
                 s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
 
               new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
-                replicaInfo.asJava, isrInfo.asJava)
+                replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
             } else if (isrInfo.size < isr.size) {
               debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " +
                 s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}")
               new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
-                replicaInfo.asJava, isrInfo.asJava)
+                replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava)
             } else {
               new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava,
-                isrInfo.asJava)
+                isrInfo.asJava, offlineReplicaInfo.asJava)
             }
         }
       }
@@ -147,14 +148,14 @@ class MetadataCache(brokerId: Int) extends Logging {
 
   private def addOrUpdatePartitionInfo(topic: String,
                                        partitionId: Int,
-                                       stateInfo: PartitionStateInfo) {
+                                       stateInfo: MetadataPartitionState) {
     inWriteLock(partitionMetadataLock) {
       val infos = cache.getOrElseUpdate(topic, mutable.Map())
       infos(partitionId) = stateInfo
     }
   }
 
-  def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = {
+  def getPartitionInfo(topic: String, partitionId: Int): Option[MetadataPartitionState] = {
     inReadLock(partitionMetadataLock) {
       cache.get(topic).flatMap(_.get(partitionId))
     }
@@ -223,10 +224,10 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  private def partitionStateToPartitionStateInfo(partitionState: PartitionState): PartitionStateInfo = {
+  private def partitionStateToPartitionStateInfo(partitionState: UpdateMetadataRequest.PartitionState): MetadataPartitionState = {
     val leaderAndIsr = LeaderAndIsr(partitionState.leader, partitionState.leaderEpoch, partitionState.isr.asScala.map(_.toInt).toList, partitionState.zkVersion)
     val leaderInfo = LeaderIsrAndControllerEpoch(leaderAndIsr, partitionState.controllerEpoch)
-    PartitionStateInfo(leaderInfo, partitionState.replicas.asScala.map(_.toInt))
+    MetadataPartitionState(leaderInfo, partitionState.replicas.asScala.map(_.toInt), partitionState.offlineReplicas.asScala.map(_.toInt))
   }
 
   def contains(topic: String): Boolean = {


Mime
View raw message