kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1647; Create replicas on follower transition even if leader is unavailable, otherwise replication offset checkpoints (high water marks) can be lost on hard kills and restarts; reviewed by Joel Koshy, Neha Narkhede, Jun Rao and Guozhang
Date Thu, 30 Oct 2014 23:32:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 9be9a80d0 -> 2ec356fa9


KAFKA-1647; Create replicas on follower transition even if leader is
unavailable, otherwise replication offset checkpoints (high water marks)
can be lost on hard kills and restarts; reviewed by Joel Koshy, Neha
Narkhede, Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2ec356fa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2ec356fa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2ec356fa

Branch: refs/heads/0.8.2
Commit: 2ec356fa903bc65026bef9f6c1b35dd182dbe48d
Parents: 9be9a80
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Thu Oct 30 16:32:33 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Oct 30 16:32:41 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ReplicaManager.scala | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2ec356fa/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 78b7514..0637fab 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -47,8 +47,8 @@ case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset:
LogO
 
 
 class ReplicaManager(val config: KafkaConfig, 
-                     time: Time, 
-                     val zkClient: ZkClient, 
+                     time: Time,
+                     val zkClient: ZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup
{
@@ -482,6 +482,7 @@ class ReplicaManager(val config: KafkaConfig,
         val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
         val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
         leaders.find(_.id == newLeaderBrokerId) match {
+          // Only change partition state when the leader is available
           case Some(leaderBroker) =>
             if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
               partitionsToMakeFollower += partition
@@ -493,10 +494,13 @@ class ReplicaManager(val config: KafkaConfig,
           case None =>
             // The leader broker should always be present in the leaderAndIsrRequest.
             // If not, we should record the error message and abort the transition process
for this partition
-            stateChangeLogger.error(("Broker %d aborted the become-follower state change
with correlation id %d from " +
-              "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently
available")
+            stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation
id %d from controller" +
+              " %d epoch %d for partition [%s,%d] but cannot become follower since the new
leader %d is unavailable.")
               .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
               partition.topic, partition.partitionId, newLeaderBrokerId))
+            // Create the local replica even if the leader is unavailable. This is required
to ensure that we include
+            // the partition's high watermark in the checkpoint file (see KAFKA-1647)
+            partition.getOrCreateReplica()
         }
       }
 


Mime
View raw message