kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [4/5] kafka git commit: KAFKA-4763; Handle disk failure for JBOD (KIP-112)
Date Sat, 22 Jul 2017 19:36:05 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ee8fa1e..bb3b14b 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
-import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition, requests}
@@ -281,10 +281,10 @@ class RequestSendThread(val controllerId: Int,
 class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging {
   val controllerContext = controller.controllerContext
   val controllerId: Int = controller.config.brokerId
-  val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
+  val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrPartitionState]]
   val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
   val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
-  val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, PartitionStateInfo]
+  val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, MetadataPartitionState]
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
   def newBatch() {
@@ -310,12 +310,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                                       replicas: Seq[Int], callback: AbstractResponse => Unit = null) {
+                                       replicas: Seq[Int], isNew: Boolean = false) {
     val topicPartition = new TopicPartition(topic, partition)
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
-      result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas))
+      val alreadyNew = result.get(topicPartition).exists(_.isNew)
+      result.put(topicPartition, LeaderAndIsrPartitionState(leaderIsrAndControllerEpoch, replicas, isNew || alreadyNew))
     }
 
     addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
@@ -345,7 +346,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       leaderIsrAndControllerEpochOpt match {
         case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) =>
           val replicas = controllerContext.partitionReplicaAssignment(partition)
-
+          val offlineReplicas = replicas.filter(!controllerContext.isReplicaOnline(_, partition))
           val leaderIsrAndControllerEpoch = if (beingDeleted) {
             val leaderDuringDelete = LeaderAndIsr.duringDelete(leaderAndIsr.isr)
             LeaderIsrAndControllerEpoch(leaderDuringDelete, controllerEpoch)
@@ -353,7 +354,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
             l
           }
 
-          val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
+          val partitionStateInfo = MetadataPartitionState(leaderIsrAndControllerEpoch, replicas, offlineReplicas)
           updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
 
         case None =>
@@ -379,8 +380,12 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
 
   def sendRequestsToBrokers(controllerEpoch: Int) {
     try {
-      leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
-        partitionStateInfos.foreach { case (topicPartition, state) =>
+      val leaderAndIsrRequestVersion: Short =
+        if (controller.config.interBrokerProtocolVersion >= KAFKA_0_11_1_IV0) 1
+        else 0
+
+      leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) =>
+        leaderAndIsrPartitionStates.foreach { case (topicPartition, state) =>
           val typeOfRequest =
             if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader"
             else "become-follower"
@@ -389,20 +394,21 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
                                                                    state.leaderIsrAndControllerEpoch, broker,
                                                                    topicPartition.topic, topicPartition.partition))
         }
-        val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
+        val leaderIds = leaderAndIsrPartitionStates.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
           _.getNode(controller.config.interBrokerListenerName)
         }
-        val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
-          val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
+        val partitionStates = leaderAndIsrPartitionStates.map { case (topicPartition, leaderAndIsrPartitionState) =>
+          val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = leaderAndIsrPartitionState.leaderIsrAndControllerEpoch
           val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
-            partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
+            leaderAndIsrPartitionState.allReplicas.map(Integer.valueOf).asJava, leaderAndIsrPartitionState.isNew)
           topicPartition -> partitionState
         }
-        val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(controllerId, controllerEpoch, partitionStates.asJava,
-          leaders.asJava)
-        controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest)
+        val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
+          controllerEpoch, partitionStates.asJava, leaders.asJava)
+        controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder,
+          (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker)))
       }
       leaderAndIsrRequestMap.clear()
 
@@ -411,20 +417,21 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
         updateMetadataRequestBrokerSet.toString(), p._1)))
       val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
         val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
-        val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
+        val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader,
           leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
-          partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
+          partitionStateInfo.allReplicas.map(Integer.valueOf).asJava, partitionStateInfo.offlineReplicas.map(Integer.valueOf).asJava)
         topicPartition -> partitionState
       }
 
-      val version: Short =
-        if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
+      val updateMetadataRequestVersion: Short =
+        if (controller.config.interBrokerProtocolVersion >= KAFKA_0_11_1_IV0) 4
+        else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
         else 0
 
       val updateMetadataRequest = {
-        val liveBrokers = if (version == 0) {
+        val liveBrokers = if (updateMetadataRequestVersion == 0) {
           // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
           controllerContext.liveOrShuttingDownBrokers.map { broker =>
             val securityProtocol = SecurityProtocol.PLAINTEXT
@@ -441,7 +448,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
           }
         }
-        new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava,
+        new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, partitionStates.asJava,
           liveBrokers.asJava)
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/ControllerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index 2f690bb..74029b1 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -78,6 +78,15 @@ object ControllerState {
     def value = 9
   }
 
+  case object LeaderAndIsrResponseReceived extends ControllerState {
+    def value = 10
+  }
+
+  case object LogDirChange extends ControllerState {
+    def value = 11
+  }
+
   val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
-    PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange)
+    PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
+    LogDirChange)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ff47f14..3a61a59 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -33,7 +33,7 @@ import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse, LeaderAndIsrResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
@@ -52,6 +52,7 @@ class ControllerContext(val zkUtils: ZkUtils) {
   var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
   var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
   val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
+  val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicAndPartition]] = mutable.HashMap.empty
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -75,6 +76,14 @@ class ControllerContext(val zkUtils: ZkUtils) {
     }.toSet
   }
 
+  def isReplicaOnline(brokerId: Int, topicAndPartition: TopicAndPartition, includeShuttingDownBrokers: Boolean = false): Boolean = {
+    val brokerOnline = {
+      if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId)
+      else liveBrokerIds.contains(brokerId)
+    }
+    brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicAndPartition)
+  }
+
   def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
     brokerIds.flatMap { brokerId =>
       partitionReplicaAssignment.collect {
@@ -98,7 +107,9 @@ class ControllerContext(val zkUtils: ZkUtils) {
     partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic)
 
   def allLiveReplicas(): Set[PartitionAndReplica] = {
-    replicasOnBrokers(liveBrokerIds)
+    replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
+      isReplicaOnline(partitionAndReplica.replica, TopicAndPartition(partitionAndReplica.topic, partitionAndReplica.partition))
+    }
   }
 
   def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
@@ -175,6 +186,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   private val partitionReassignmentListener = new PartitionReassignmentListener(this, eventManager)
   private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, eventManager)
   private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, eventManager)
+  private val logDirEventNotificationListener = new LogDirEventNotificationListener(this, eventManager)
 
   @volatile private var activeControllerId = -1
   @volatile private var offlinePartitionCount = 0
@@ -248,6 +260,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     info("Broker %d starting become controller state transition".format(config.brokerId))
     readControllerEpochFromZookeeper()
     incrementControllerEpoch()
+    LogDirUtils.deleteLogDirEvents(zkUtils)
 
     // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
     registerPartitionReassignmentListener()
@@ -256,6 +269,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     registerTopicChangeListener()
     registerTopicDeletionListener()
     registerBrokerChangeListener()
+    registerLogDirEventNotificationListener()
 
     initializeControllerContext()
     val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
@@ -299,6 +313,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     deregisterIsrChangeNotificationListener()
     deregisterPartitionReassignmentListener()
     deregisterPreferredReplicaElectionListener()
+    deregisterLogDirEventNotificationListener()
 
     // reset topic deletion manager
     topicDeletionManager.reset()
@@ -329,6 +344,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    */
   def isActive: Boolean = activeControllerId == config.brokerId
 
+  /*
+   * This callback is invoked by the controller's LogDirEventNotificationListener with the list of broker ids who
+   * have experienced new log directory failures. In response the controller should send LeaderAndIsrRequest
+   * to all these brokers to query the state of their replicas
+   */
+  def onBrokerLogDirFailure(brokerIds: Seq[Int]) {
+    // send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online.
+    val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet)
+    replicaStateMachine.handleStateChanges(replicasOnBrokers, OnlineReplica)
+  }
+
   /**
    * This callback is invoked by the replica state machine's broker change listener, with the list of newly started
    * brokers as input. It does the following -
@@ -345,6 +371,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    */
   def onBrokerStartup(newBrokers: Seq[Int]) {
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
+    newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val newBrokersSet = newBrokers.toSet
     // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
     // broker via this update.
@@ -374,46 +401,55 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     }
   }
 
-  /**
+  /*
    * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers
-   * as input. It does the following -
-   * 1. Mark partitions with dead leaders as offline
-   * 2. Triggers the OnlinePartition state change for all new/offline partitions
-   * 3. Invokes the OfflineReplica state change on the input list of newly started brokers
-   * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers
-   *
-   * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point.  This is because
-   * the partition state machine will refresh our cache for us when performing leader election for all new/offline
-   * partitions coming online.
+   * as input. It will call onReplicaBecomeOffline(...) with the list of replicas on those failed brokers as input.
    */
   def onBrokerFailure(deadBrokers: Seq[Int]) {
     info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
+    deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val deadBrokersThatWereShuttingDown =
       deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
     info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
-    val deadBrokersSet = deadBrokers.toSet
-    // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
+    val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
+    onReplicasBecomeOffline(allReplicasOnDeadBrokers)
+  }
+
+  /**
+    * This method marks the given replicas as offline. It does the following -
+    * 1. Mark the given partitions as offline
+    * 2. Triggers the OnlinePartition state change for all new/offline partitions
+    * 3. Invokes the OfflineReplica state change on the input list of newly offline replicas
+    * 4. If no partitions are affected then send UpdateMetadataRequest to live or shutting down brokers
+    *
+    * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point.  This is because
+    * the partition state machine will refresh our cache for us when performing leader election for all new/offline
+    * partitions coming online.
+    */
+  def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = {
+    val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
+      newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
+
     val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
-      deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
+      !controllerContext.isReplicaOnline(partitionAndLeader._2.leaderAndIsr.leader, partitionAndLeader._1) &&
         !topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
+
+    // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
-    // filter out the replicas that belong to topics that are being deleted
-    var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
-    val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
-    // handle dead replicas
-    replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
-    // check if topic deletion state for the dead replicas needs to be updated
-    val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
-    if(replicasForTopicsToBeDeleted.nonEmpty) {
+    // trigger OfflineReplica state change for those newly offline replicas
+    replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion, OfflineReplica)
+
+    // fail deletion of topics that affected by the offline replicas
+    if (newOfflineReplicasForDeletion.nonEmpty) {
       // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
-      // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
+      // deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely
       // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
-      topicDeletionManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
+      topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
     }
 
-    // If broker failure did not require leader re-election, inform brokers of failed broker
+    // If replica failure did not require leader re-election, inform brokers of the offline replica
     // Note that during leader re-election, brokers update their metadata
     if (partitionsWithoutLeader.isEmpty) {
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
@@ -724,10 +760,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
   private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
     val topicsToBeDeleted = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
-    val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case (_, replicas) =>
-      replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
+    val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
+      replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
+    }.keySet.map(_.topic)
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
-    val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress
+    val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
     info("List of topics to be deleted: %s".format(topicsToBeDeleted.mkString(",")))
     info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
     (topicsToBeDeleted, topicsIneligibleForDeletion)
@@ -771,7 +808,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
     } else {
       // check if the leader is alive or not
-      if (controllerContext.liveBrokerIds.contains(currentLeader)) {
+      if (controllerContext.isReplicaOnline(currentLeader, topicAndPartition)) {
         info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
           "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
         // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
@@ -909,6 +946,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     }
   }
 
+  private def registerLogDirEventNotificationListener() = {
+    debug("Registering logDirEventNotificationListener")
+    zkUtils.zkClient.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+  }
+
+  private def deregisterLogDirEventNotificationListener() = {
+    debug("De-registering logDirEventNotificationListener")
+    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
+  }
+
   private def readControllerEpochFromZookeeper() {
     // initialize the controller epoch and zk version by reading from zookeeper
     if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) {
@@ -1119,7 +1166,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
         topicsNotInPreferredReplica.keys.foreach { topicPartition =>
           // do this check only if the broker is live and there are no partitions being reassigned currently
           // and preferred replica election is not in progress
-          if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+          if (controllerContext.isReplicaOnline(leaderBroker, topicPartition) &&
             controllerContext.partitionsBeingReassigned.isEmpty &&
             !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
             controllerContext.allTopics.contains(topicPartition.topic)) {
@@ -1352,6 +1399,22 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
   }
 
+  case class LogDirEventNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+
+    def state = ControllerState.LogDirChange
+
+    override def process(): Unit = {
+      val zkUtils = controllerContext.zkUtils
+      try {
+        val brokerIds = sequenceNumbers.flatMap(LogDirUtils.getBrokerIdFromLogDirEvent(zkUtils, _))
+        onBrokerLogDirFailure(brokerIds)
+      } finally {
+        // delete processed children
+        sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
+      }
+    }
+  }
+
   case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent {
 
     def state = ControllerState.ManualLeaderBalance
@@ -1447,7 +1510,35 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     }
   }
 
-  case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
+  case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
+
+    def state = ControllerState.LeaderAndIsrResponseReceived
+
+    override def process(): Unit = {
+      import JavaConverters._
+      val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
+
+      if (leaderAndIsrResponse.error() != Errors.NONE) {
+        stateChangeLogger.error(s"Received error in leaderAndIsrResponse $leaderAndIsrResponse from broker $brokerId")
+        return
+      }
+
+      val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
+        tp => TopicAndPartition(tp.topic(), tp.partition())).toSet
+      val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
+        tp => TopicAndPartition(tp.topic(), tp.partition())).toSet
+      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
+      val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
+      controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
+      val newOfflineReplicas = (currentOfflineReplicas -- previousOfflineReplicas).map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId))
+      stateChangeLogger.info(s"Mark replicas ${currentOfflineReplicas -- previousOfflineReplicas} on broker $brokerId as offline")
+
+      if (newOfflineReplicas.nonEmpty)
+        onReplicasBecomeOffline(newOfflineReplicas)
+    }
+  }
+
+  case class TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
 
     def state = ControllerState.TopicDeletion
 
@@ -1607,6 +1698,20 @@ class TopicChangeListener(controller: KafkaController, eventManager: ControllerE
   }
 }
 
+/**
+  * Called when broker notifies controller of log directory change
+  */
+class LogDirEventNotificationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging {
+  override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
+    import JavaConverters._
+    eventManager.put(controller.LogDirEventNotification(currentChilds.asScala))
+  }
+}
+
+object LogDirEventNotificationListener {
+  val version: Long = 1L
+}
+
 class PartitionModificationsListener(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends IZkDataListener with Logging {
   override def handleDataChange(dataPath: String, data: Any): Unit = {
     eventManager.put(controller.PartitionModifications(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 54bbb89..e534ff3 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -54,8 +54,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
   def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
       case Some(assignedReplicas) =>
-        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
-        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
+        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition))
+        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition))
         val newLeaderAndIsr =
           if (liveBrokersInIsr.isEmpty) {
             // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
@@ -63,7 +63,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
             if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
               ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               throw new NoReplicaOnlineException(
-                s"No broker in ISR for partition $topicAndPartition is alive. Live brokers are: [${controllerContext.liveBrokerIds}], " +
+                s"No replica in ISR for partition $topicAndPartition is alive. Live brokers are: [${controllerContext.liveBrokerIds}], " +
                   s"ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]"
               )
             }
@@ -111,7 +111,7 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
                    currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
     val newLeaderOpt = reassignedInSyncReplicas.find { r =>
-      controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)
+      controllerContext.isReplicaOnline(r, topicAndPartition) && currentLeaderAndIsr.isr.contains(r)
     }
     newLeaderOpt match {
       case Some(newLeader) => (currentLeaderAndIsr.newLeader(newLeader), reassignedInSyncReplicas)
@@ -150,7 +150,7 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte
       info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
         " Triggering preferred replica leader election")
       // check if preferred replica is not the current leader and is alive and in the isr
-      if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
+      if (controllerContext.isReplicaOnline(preferredReplica, topicAndPartition) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
         val newLeaderAndIsr = currentLeaderAndIsr.newLeader(preferredReplica)
         (newLeaderAndIsr, assignedReplicas)
       } else {
@@ -174,8 +174,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) ext
                    currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     val currentIsr = currentLeaderAndIsr.isr
     val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
-    val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
+    val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition, true))
 
     val newIsr = currentIsr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
     liveAssignedReplicas.find(newIsr.contains) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 5751e17..5024c02 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -200,7 +200,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       controllerContext.partitionLeadershipInfo.get(topicPartition) match {
         case Some(currentLeaderIsrAndEpoch) =>
           // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
-          if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
+          if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
             // leader is alive
             partitionState.put(topicPartition, OnlinePartition)
           else
@@ -227,7 +227,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    */
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) = {
     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition).toList
-    val liveAssignedReplicas = replicaAssignment.filter(controllerContext.liveBrokerIds.contains)
+    val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition))
     liveAssignedReplicas.headOption match {
       case None =>
         val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered error during state change of " +
@@ -259,7 +259,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             topicAndPartition.topic,
             topicAndPartition.partition,
             leaderIsrAndControllerEpoch,
-            replicaAssignment
+            replicaAssignment,
+            isNew = true
           )
         } catch {
           case _: ZkNodeExistsException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 43fac19..87c53b5 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -149,7 +149,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                   .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                                   topic, partition, leaderIsrAndControllerEpoch,
-                                                                  replicaAssignment)
+                                                                  replicaAssignment, isNew = true)
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put(partitionAndReplica, NewReplica)
@@ -283,7 +283,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       val partition = topicPartition.partition
       assignedReplicas.foreach { replicaId =>
         val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
-        if (controllerContext.liveBrokerIds.contains(replicaId))
+        if (controllerContext.isReplicaOnline(replicaId, topicPartition))
           replicaState.put(partitionAndReplica, OnlineReplica)
         else
           // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
@@ -297,6 +297,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   def partitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
     controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
   }
+
 }
 
 sealed trait ReplicaState {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index e483ac2..325488e 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -299,7 +299,7 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller
       debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
         new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
-          eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build)
+          eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))).build)
       if (deadReplicasForTopic.nonEmpty) {
         debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
         markTopicIneligibleForDeletion(Set(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 9322ff2..db2bd67 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -199,7 +199,8 @@ class GroupMetadataManager(brokerId: Int,
                    | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
                 Errors.COORDINATOR_NOT_AVAILABLE
 
-              case Errors.NOT_LEADER_FOR_PARTITION =>
+              case Errors.NOT_LEADER_FOR_PARTITION
+                   | Errors.KAFKA_STORAGE_ERROR =>
                 Errors.NOT_COORDINATOR
 
               case Errors.REQUEST_TIMED_OUT =>
@@ -212,7 +213,7 @@ class GroupMetadataManager(brokerId: Int,
                 error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
                   s"${status.error.exceptionName}, returning UNKNOWN error code to the client")
 
-                Errors.UNKNOWN
+                Errors.UNKNOWN_SERVER_ERROR
 
               case other =>
                 error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
@@ -342,7 +343,8 @@ class GroupMetadataManager(brokerId: Int,
                        | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
                     Errors.COORDINATOR_NOT_AVAILABLE
 
-                  case Errors.NOT_LEADER_FOR_PARTITION =>
+                  case Errors.NOT_LEADER_FOR_PARTITION
+                       | Errors.KAFKA_STORAGE_ERROR =>
                     Errors.NOT_COORDINATOR
 
                   case Errors.MESSAGE_TOO_LARGE
@@ -695,7 +697,7 @@ class GroupMetadataManager(brokerId: Int,
           val timestampType = TimestampType.CREATE_TIME
           val timestamp = time.milliseconds()
 
-          val partitionOpt = replicaManager.getPartition(appendPartition)
+          val partitionOpt = replicaManager.getPartition(appendPartition).filter(_ ne ReplicaManager.OfflinePartition)
           partitionOpt.foreach { partition =>
             val tombstones = ListBuffer.empty[SimpleRecord]
             removedOffsets.foreach { case (topicPartition, offsetAndMetadata) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 85c19c5..e201e91 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -266,7 +266,7 @@ class TransactionCoordinator(brokerId: Int,
   }
 
   def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
-      txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
+    txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
   }
 
   def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e0d5076..394817c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -485,12 +485,13 @@ class TransactionStateManager(brokerId: Int,
                | Errors.REQUEST_TIMED_OUT => // note that for timed out request we return NOT_AVAILABLE error code to let client retry
             Errors.COORDINATOR_NOT_AVAILABLE
 
-          case Errors.NOT_LEADER_FOR_PARTITION =>
+          case Errors.NOT_LEADER_FOR_PARTITION
+               | Errors.KAFKA_STORAGE_ERROR =>
             Errors.NOT_COORDINATOR
 
           case Errors.MESSAGE_TOO_LARGE
                | Errors.RECORD_LIST_TOO_LARGE =>
-            Errors.UNKNOWN
+            Errors.UNKNOWN_SERVER_ERROR
 
           case other =>
             other

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index bfc6828..d569ad9 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -178,6 +178,12 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
     trimToValidSize()
   }
 
+  def closeHandler() = {
+    // File handler of the index field will be closed after the mmap is garbage collected
+    CoreUtils.swallow(forceUnmap(mmap))
+    mmap = null
+  }
+
   /**
    * Do a basic sanity check on this index to detect obvious problems
    *

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 824d302..0fec75f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,11 +24,11 @@ import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
 
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common._
+import kafka.common.{InvalidOffsetException, KafkaException, LongRef}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
+import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
-import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{KafkaStorageException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
 
@@ -136,7 +136,8 @@ class Log(@volatile var dir: File,
           val maxProducerIdExpirationMs: Int,
           val producerIdExpirationCheckIntervalMs: Int,
           val topicPartition: TopicPartition,
-          val producerStateManager: ProducerStateManager) extends Logging with KafkaMetricsGroup {
+          val producerStateManager: ProducerStateManager,
+          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
 
@@ -164,7 +165,7 @@ class Log(@volatile var dir: File,
    * temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets
    * of each ongoing transaction in order to compute a new first unstable offset. It is possible, however,
    * that this could result in disagreement between replicas depending on when they began replicating the log.
-   * In the worst case, the LSO could be seen by a consumer to go backwards. 
+   * In the worst case, the LSO could be seen by a consumer to go backwards.
    */
   @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None
 
@@ -233,7 +234,7 @@ class Log(@volatile var dir: File,
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
     new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
-      new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir)))
+      new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel))
   }
 
   private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
@@ -266,6 +267,7 @@ class Log(@volatile var dir: File,
     swapFiles
   }
 
+  // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
   private def loadSegmentFiles(): Unit = {
     // load segments in ascending order because transactional data from one segment may depend on the
     // segments that come before it
@@ -340,6 +342,7 @@ class Log(@volatile var dir: File,
     bytesTruncated
   }
 
+  // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
   private def completeSwapOperations(swapFiles: Set[File]): Unit = {
     for (swapFile <- swapFiles) {
       val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
@@ -366,7 +369,8 @@ class Log(@volatile var dir: File,
     }
   }
 
-  /* Load the log segments from the log files on disk */
+  // Load the log segments from the log files on disk
+  // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
   private def loadSegments() {
     // first do a pass through the files in the log directory and remove any temporary files
     // and find any interrupted swap operations
@@ -383,14 +387,14 @@ class Log(@volatile var dir: File,
     if(logSegments.isEmpty) {
       // no existing segments, create a new mutable segment beginning at offset 0
       segments.put(0L, new LogSegment(dir = dir,
-                                     startOffset = 0,
-                                     indexIntervalBytes = config.indexInterval,
-                                     maxIndexSize = config.maxIndexSize,
-                                     rollJitterMs = config.randomSegmentJitter,
-                                     time = time,
-                                     fileAlreadyExists = false,
-                                     initFileSize = this.initFileSize(),
-                                     preallocate = config.preallocate))
+                                      startOffset = 0,
+                                      indexIntervalBytes = config.indexInterval,
+                                      maxIndexSize = config.maxIndexSize,
+                                      rollJitterMs = config.randomSegmentJitter,
+                                      time = time,
+                                      fileAlreadyExists = false,
+                                      initFileSize = this.initFileSize(),
+                                      preallocate = config.preallocate))
     } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
       recoverLog()
       // reset the index size of the currently active log segment to allow more entries
@@ -403,6 +407,7 @@ class Log(@volatile var dir: File,
     nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size)
   }
 
+  // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
   private def recoverLog() {
     // if we have the clean shutdown marker, skip recovery
     if(hasCleanShutdownFile) {
@@ -532,6 +537,16 @@ class Log(@volatile var dir: File,
   }
 
   /**
+   * Close file handlers used by log but don't write to disk. This is used when the disk may have failed
+   */
+  def closeHandlers() {
+    debug(s"Closing handlers of log $name")
+    lock synchronized {
+      logSegments.foreach(_.closeHandlers())
+    }
+  }
+
+  /**
    * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
    * @param records The records to append
    * @param isFromClient Whether or not this append is from a producer
@@ -566,16 +581,16 @@ class Log(@volatile var dir: File,
    * @return Information about the appended messages including the first and last offset.
    */
   private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
-    val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
+    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
+      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
 
-    // return if we have no valid messages or if this is a duplicate of the last appended entry
-    if (appendInfo.shallowCount == 0)
-      return appendInfo
+      // return if we have no valid messages or if this is a duplicate of the last appended entry
+      if (appendInfo.shallowCount == 0)
+        return appendInfo
 
-    // trim any invalid bytes or partial messages before appending it to the on-disk log
-    var validRecords = trimInvalidBytes(records, appendInfo)
+      // trim any invalid bytes or partial messages before appending it to the on-disk log
+      var validRecords = trimInvalidBytes(records, appendInfo)
 
-    try {
       // they are valid, insert them in the log
       lock synchronized {
 
@@ -695,8 +710,6 @@ class Log(@volatile var dir: File,
 
         appendInfo
       }
-    } catch {
-      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
     }
   }
 
@@ -730,12 +743,14 @@ class Log(@volatile var dir: File,
     // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
     // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
     // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
-    lock synchronized {
-      if (offset > logStartOffset) {
-        logStartOffset = offset
-        leaderEpochCache.clearAndFlushEarliest(logStartOffset)
-        producerStateManager.truncateHead(logStartOffset)
-        updateFirstUnstableOffset()
+    maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $offset in dir ${dir.getParent}") {
+      lock synchronized {
+        if (offset > logStartOffset) {
+          logStartOffset = offset
+          leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+          producerStateManager.truncateHead(logStartOffset)
+          updateFirstUnstableOffset()
+        }
       }
     }
   }
@@ -892,64 +907,66 @@ class Log(@volatile var dir: File,
    */
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
            isolationLevel: IsolationLevel): FetchDataInfo = {
-    trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
-
-    // Because we don't use lock for reading, the synchronization is a little bit tricky.
-    // We create the local variables to avoid race conditions with updates to the log.
-    val currentNextOffsetMetadata = nextOffsetMetadata
-    val next = currentNextOffsetMetadata.messageOffset
-    if (startOffset == next) {
-      val abortedTransactions =
-        if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction])
-        else None
-      return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,
-        abortedTransactions = abortedTransactions)
-    }
+    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
+      trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
+
+      // Because we don't use lock for reading, the synchronization is a little bit tricky.
+      // We create the local variables to avoid race conditions with updates to the log.
+      val currentNextOffsetMetadata = nextOffsetMetadata
+      val next = currentNextOffsetMetadata.messageOffset
+      if (startOffset == next) {
+        val abortedTransactions =
+          if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction])
+          else None
+        return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,
+          abortedTransactions = abortedTransactions)
+      }
 
-    var segmentEntry = segments.floorEntry(startOffset)
-
-    // return error on attempt to read beyond the log end offset or read below log start offset
-    if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
-      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
-
-    // Do the read on the segment with a base offset less than the target offset
-    // but if that segment doesn't contain any messages with an offset greater than that
-    // continue to read from successive segments until we get some messages or we reach the end of the log
-    while(segmentEntry != null) {
-      val segment = segmentEntry.getValue
-
-      // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
-      // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
-      // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
-      // end of the active segment.
-      val maxPosition = {
-        if (segmentEntry == segments.lastEntry) {
-          val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
-          // Check the segment again in case a new segment has just rolled out.
-          if (segmentEntry != segments.lastEntry)
+      var segmentEntry = segments.floorEntry(startOffset)
+
+      // return error on attempt to read beyond the log end offset or read below log start offset
+      if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
+        throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
+
+      // Do the read on the segment with a base offset less than the target offset
+      // but if that segment doesn't contain any messages with an offset greater than that
+      // continue to read from successive segments until we get some messages or we reach the end of the log
+      while (segmentEntry != null) {
+        val segment = segmentEntry.getValue
+
+        // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
+        // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
+        // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
+        // end of the active segment.
+        val maxPosition = {
+          if (segmentEntry == segments.lastEntry) {
+            val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
+            // Check the segment again in case a new segment has just rolled out.
+            if (segmentEntry != segments.lastEntry)
             // New log segment has rolled out, we can read up to the file end.
+              segment.size
+            else
+              exposedPos
+          } else {
             segment.size
-          else
-            exposedPos
-        } else {
-          segment.size
+          }
         }
-      }
-      val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
-      if (fetchInfo == null) {
-        segmentEntry = segments.higherEntry(segmentEntry.getKey)
-      } else {
-        return isolationLevel match {
-          case IsolationLevel.READ_UNCOMMITTED => fetchInfo
-          case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
+        val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
+        if (fetchInfo == null) {
+          segmentEntry = segments.higherEntry(segmentEntry.getKey)
+        } else {
+          return isolationLevel match {
+            case IsolationLevel.READ_UNCOMMITTED => fetchInfo
+            case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
+          }
         }
       }
-    }
 
-    // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
-    // this can happen when all messages with offset larger than start offsets have been deleted.
-    // In this case, we will return the empty set with log end offset metadata
-    FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
+      // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
+      // this can happen when all messages with offset larger than start offsets have been deleted.
+      // In this case, we will return the empty set with log end offset metadata
+      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
+    }
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
@@ -1011,35 +1028,37 @@ class Log(@volatile var dir: File,
    *         None if no such message is found.
    */
   def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = {
-    debug(s"Searching offset for timestamp $targetTimestamp")
+    maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
+      debug(s"Searching offset for timestamp $targetTimestamp")
 
-    if (config.messageFormatVersion < KAFKA_0_10_0_IV0 &&
+      if (config.messageFormatVersion < KAFKA_0_10_0_IV0 &&
         targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
         targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
-      throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
+        throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
           s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
           s"required version $KAFKA_0_10_0_IV0")
 
-    // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
-    // constant time access while being safe to use with concurrent collections unlike `toArray`.
-    val segmentsCopy = logSegments.toBuffer
-    // For the earliest and latest, we do not need to return the timestamp.
-    if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
+      // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+      // constant time access while being safe to use with concurrent collections unlike `toArray`.
+      val segmentsCopy = logSegments.toBuffer
+      // For the earliest and latest, we do not need to return the timestamp.
+      if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
         return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
-    else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
+      else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
         return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
 
-    val targetSeg = {
-      // Get all the segments whose largest timestamp is smaller than target timestamp
-      val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
-      // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
-      if (earlierSegs.length < segmentsCopy.length)
-        Some(segmentsCopy(earlierSegs.length))
-      else
-        None
-    }
+      val targetSeg = {
+        // Get all the segments whose largest timestamp is smaller than target timestamp
+        val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
+        // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
+        if (earlierSegs.length < segmentsCopy.length)
+          Some(segmentsCopy(earlierSegs.length))
+        else
+          None
+      }
 
-    targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
+      targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
+    }
   }
 
   /**
@@ -1070,18 +1089,20 @@ class Log(@volatile var dir: File,
   }
 
   private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
-    val numToDelete = deletable.size
-    if (numToDelete > 0) {
-      // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
-      if (segments.size == numToDelete)
-        roll()
-      lock synchronized {
-        // remove the segments for lookups
-        deletable.foreach(deleteSegment)
-        maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
+    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
+      val numToDelete = deletable.size
+      if (numToDelete > 0) {
+        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
+        if (segments.size == numToDelete)
+          roll()
+        lock synchronized {
+          // remove the segments for lookups
+          deletable.foreach(deleteSegment)
+          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
+        }
       }
+      numToDelete
     }
-    numToDelete
   }
 
   /**
@@ -1204,58 +1225,60 @@ class Log(@volatile var dir: File,
    * @return The newly rolled segment
    */
   def roll(expectedNextOffset: Long = 0): LogSegment = {
-    val start = time.nanoseconds
-    lock synchronized {
-      val newOffset = math.max(expectedNextOffset, logEndOffset)
-      val logFile = Log.logFile(dir, newOffset)
-      val offsetIdxFile = offsetIndexFile(dir, newOffset)
-      val timeIdxFile = timeIndexFile(dir, newOffset)
-      val txnIdxFile = transactionIndexFile(dir, newOffset)
-      for(file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
-        warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
-        file.delete()
-      }
-
-      segments.lastEntry() match {
-        case null =>
-        case entry => {
-          val seg = entry.getValue
-          seg.onBecomeInactiveSegment()
-          seg.index.trimToValidSize()
-          seg.timeIndex.trimToValidSize()
-          seg.log.trim()
+    maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
+      val start = time.nanoseconds
+      lock synchronized {
+        val newOffset = math.max(expectedNextOffset, logEndOffset)
+        val logFile = Log.logFile(dir, newOffset)
+        val offsetIdxFile = offsetIndexFile(dir, newOffset)
+        val timeIdxFile = timeIndexFile(dir, newOffset)
+        val txnIdxFile = transactionIndexFile(dir, newOffset)
+        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
+          warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
+          file.delete()
         }
-      }
 
-      // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
-      // offset align with the new segment offset since this ensures we can recover the segment by beginning
-      // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
-      // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
-      // we manually override the state offset here prior to taking the snapshot.
-      producerStateManager.updateMapEndOffset(newOffset)
-      producerStateManager.takeSnapshot()
+        segments.lastEntry() match {
+          case null =>
+          case entry => {
+            val seg = entry.getValue
+            seg.onBecomeInactiveSegment()
+            seg.index.trimToValidSize()
+            seg.timeIndex.trimToValidSize()
+            seg.log.trim()
+          }
+        }
 
-      val segment = new LogSegment(dir,
-                                   startOffset = newOffset,
-                                   indexIntervalBytes = config.indexInterval,
-                                   maxIndexSize = config.maxIndexSize,
-                                   rollJitterMs = config.randomSegmentJitter,
-                                   time = time,
-                                   fileAlreadyExists = false,
-                                   initFileSize = initFileSize,
-                                   preallocate = config.preallocate)
-      val prev = addSegment(segment)
-      if(prev != null)
-        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
-      // We need to update the segment base offset and append position data of the metadata when log rolls.
-      // The next offset should not change.
-      updateLogEndOffset(nextOffsetMetadata.messageOffset)
-      // schedule an asynchronous flush of the old segment
-      scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
-
-      info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
+        // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
+        // offset align with the new segment offset since this ensures we can recover the segment by beginning
+        // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
+        // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
+        // we manually override the state offset here prior to taking the snapshot.
+        producerStateManager.updateMapEndOffset(newOffset)
+        producerStateManager.takeSnapshot()
 
-      segment
+        val segment = new LogSegment(dir,
+          startOffset = newOffset,
+          indexIntervalBytes = config.indexInterval,
+          maxIndexSize = config.maxIndexSize,
+          rollJitterMs = config.randomSegmentJitter,
+          time = time,
+          fileAlreadyExists = false,
+          initFileSize = initFileSize,
+          preallocate = config.preallocate)
+        val prev = addSegment(segment)
+        if (prev != null)
+          throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
+        // We need to update the segment base offset and append position data of the metadata when log rolls.
+        // The next offset should not change.
+        updateLogEndOffset(nextOffsetMetadata.messageOffset)
+        // schedule an asynchronous flush of the old segment
+        scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
+
+        info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0 * 1000.0)))
+
+        segment
+      }
     }
   }
 
@@ -1275,22 +1298,24 @@ class Log(@volatile var dir: File,
    * @param offset The offset to flush up to (non-inclusive); the new recovery point
    */
   def flush(offset: Long) : Unit = {
-    if (offset <= this.recoveryPoint)
-      return
-    debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
-          time.milliseconds + " unflushed = " + unflushedMessages)
-    for(segment <- logSegments(this.recoveryPoint, offset))
-      segment.flush()
+    maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") {
+      if (offset <= this.recoveryPoint)
+        return
+      debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
+        time.milliseconds + " unflushed = " + unflushedMessages)
+      for (segment <- logSegments(this.recoveryPoint, offset))
+        segment.flush()
 
-    // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain
-    // the snapshots from the recent segments in case we need to truncate and rebuild the producer state.
-    // Otherwise, we would always need to rebuild from the earliest segment.
-    producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset))
+      // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain
+      // the snapshots from the recent segments in case we need to truncate and rebuild the producer state.
+      // Otherwise, we would always need to rebuild from the earliest segment.
+      producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset))
 
-    lock synchronized {
-      if(offset > this.recoveryPoint) {
-        this.recoveryPoint = offset
-        lastflushedTime.set(time.milliseconds)
+      lock synchronized {
+        if (offset > this.recoveryPoint) {
+          this.recoveryPoint = offset
+          lastflushedTime.set(time.milliseconds)
+        }
       }
     }
   }
@@ -1310,11 +1335,13 @@ class Log(@volatile var dir: File,
    * Completely delete this log directory and all contents from the file system with no delay
    */
   private[log] def delete() {
-    lock synchronized {
-      logSegments.foreach(_.delete())
-      segments.clear()
-      leaderEpochCache.clear()
-      Utils.delete(dir)
+    maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
+      lock synchronized {
+        logSegments.foreach(_.delete())
+        segments.clear()
+        leaderEpochCache.clear()
+        Utils.delete(dir)
+      }
     }
   }
 
@@ -1344,25 +1371,27 @@ class Log(@volatile var dir: File,
    * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
    */
   private[log] def truncateTo(targetOffset: Long) {
-    if(targetOffset < 0)
-      throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
-    if(targetOffset >= logEndOffset) {
-      info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
-      return
-    }
-    info("Truncating log %s to offset %d.".format(name, targetOffset))
-    lock synchronized {
-      if(segments.firstEntry.getValue.baseOffset > targetOffset) {
-        truncateFullyAndStartAt(targetOffset)
-      } else {
-        val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
-        deletable.foreach(deleteSegment)
-        activeSegment.truncateTo(targetOffset)
-        updateLogEndOffset(targetOffset)
-        this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
-        this.logStartOffset = math.min(targetOffset, this.logStartOffset)
-        leaderEpochCache.clearAndFlushLatest(targetOffset)
-        loadProducerState(targetOffset, reloadFromCleanShutdown = false)
+    maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") {
+      if (targetOffset < 0)
+        throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
+      if (targetOffset >= logEndOffset) {
+        info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset - 1))
+        return
+      }
+      info("Truncating log %s to offset %d.".format(name, targetOffset))
+      lock synchronized {
+        if (segments.firstEntry.getValue.baseOffset > targetOffset) {
+          truncateFullyAndStartAt(targetOffset)
+        } else {
+          val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
+          deletable.foreach(deleteSegment)
+          activeSegment.truncateTo(targetOffset)
+          updateLogEndOffset(targetOffset)
+          this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
+          this.logStartOffset = math.min(targetOffset, this.logStartOffset)
+          leaderEpochCache.clearAndFlushLatest(targetOffset)
+          loadProducerState(targetOffset, reloadFromCleanShutdown = false)
+        }
       }
     }
   }
@@ -1373,28 +1402,30 @@ class Log(@volatile var dir: File,
    *  @param newOffset The new offset to start the log with
    */
   private[log] def truncateFullyAndStartAt(newOffset: Long) {
-    debug(s"Truncate and start log '$name' at offset $newOffset")
-    lock synchronized {
-      val segmentsToDelete = logSegments.toList
-      segmentsToDelete.foreach(deleteSegment)
-      addSegment(new LogSegment(dir,
-                                newOffset,
-                                indexIntervalBytes = config.indexInterval,
-                                maxIndexSize = config.maxIndexSize,
-                                rollJitterMs = config.randomSegmentJitter,
-                                time = time,
-                                fileAlreadyExists = false,
-                                initFileSize = initFileSize,
-                                preallocate = config.preallocate))
-      updateLogEndOffset(newOffset)
-      leaderEpochCache.clearAndFlush()
-
-      producerStateManager.truncate()
-      producerStateManager.updateMapEndOffset(newOffset)
-      updateFirstUnstableOffset()
+    maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
+      debug(s"Truncate and start log '$name' at offset $newOffset")
+      lock synchronized {
+        val segmentsToDelete = logSegments.toList
+        segmentsToDelete.foreach(deleteSegment)
+        addSegment(new LogSegment(dir,
+          newOffset,
+          indexIntervalBytes = config.indexInterval,
+          maxIndexSize = config.maxIndexSize,
+          rollJitterMs = config.randomSegmentJitter,
+          time = time,
+          fileAlreadyExists = false,
+          initFileSize = initFileSize,
+          preallocate = config.preallocate))
+        updateLogEndOffset(newOffset)
+        leaderEpochCache.clearAndFlush()
+
+        producerStateManager.truncate()
+        producerStateManager.updateMapEndOffset(newOffset)
+        updateFirstUnstableOffset()
 
-      this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
-      this.logStartOffset = newOffset
+        this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
+        this.logStartOffset = newOffset
+      }
     }
   }
 
@@ -1439,6 +1470,9 @@ class Log(@volatile var dir: File,
    * This allows reads to happen concurrently without synchronization and without the possibility of physically
    * deleting a file while it is being read from.
    *
+   * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded
+   * or the immediate caller will catch and handle IOException
+   *
    * @param segment The log segment to schedule for deletion
    */
   private def deleteSegment(segment: LogSegment) {
@@ -1452,13 +1486,18 @@ class Log(@volatile var dir: File,
   /**
    * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
    *
-   * @throws KafkaStorageException if the file can't be renamed and still exists
+   * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because
+   * it is either called before all logs are loaded or the caller will catch and handle IOException
+   *
+   * @throws IOException if the file can't be renamed and still exists
    */
   private def asyncDeleteSegment(segment: LogSegment) {
     segment.changeFileSuffixes("", Log.DeletedFileSuffix)
     def deleteSeg() {
       info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
-      segment.delete()
+      maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
+        segment.delete()
+      }
     }
     scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
   }
@@ -1467,6 +1506,9 @@ class Log(@volatile var dir: File,
    * Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will
    * be asynchronously deleted.
    *
+   * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded
+   * or the caller will catch and handle IOException
+   *
    * The sequence of operations is:
    * <ol>
    *   <li> Cleaner creates new segment with suffix .cleaned and invokes replaceSegments().
@@ -1524,6 +1566,16 @@ class Log(@volatile var dir: File,
    */
   def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
 
+  private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
+    try {
+      fun
+    } catch {
+      case e: IOException =>
+        logDirFailureChannel.maybeAddLogFailureEvent(dir.getParent)
+        throw new KafkaStorageException(msg, e)
+    }
+  }
+
 }
 
 /**
@@ -1574,11 +1626,12 @@ object Log {
             brokerTopicStats: BrokerTopicStats,
             time: Time = Time.SYSTEM,
             maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-            producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000): Log = {
+            producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
+            logDirFailureChannel: LogDirFailureChannel = null): Log = {
     val topicPartition = Log.parseTopicPartitionName(dir)
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs,
-      producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager)
+      producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel)
   }
 
   /**


Mime
View raw message