kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5745; makeLeader should invoke `convertHWToLocalOffsetMetadata` before marking it as leader
Date Fri, 18 Aug 2017 14:59:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a4720b25a -> 3457c4761


KAFKA-5745; makeLeader should invoke `convertHWToLocalOffsetMetadata` before marking it as
leader

Author: huxihx <huxi_2b@hotmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3682 from huxihx/KAFKA-5745


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3457c476
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3457c476
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3457c476

Branch: refs/heads/trunk
Commit: 3457c4761ae09ef2befc141278fe489900cac470
Parents: a4720b2
Author: huxihx <huxi_2b@hotmail.com>
Authored: Fri Aug 18 15:58:59 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Aug 18 15:58:59 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3457c476/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 925c8f0..9a55200 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -206,13 +206,8 @@ class Partition(val topic: String,
       allReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
 
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
-      val isNewLeader =
-        if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId)
{
-          false
-        } else {
-          leaderReplicaIdOpt = Some(localBrokerId)
-          true
-        }
+      val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)
+
       val leaderReplica = getReplica().get
       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
       val curTimeMs = time.milliseconds
@@ -221,13 +216,16 @@ class Partition(val topic: String,
         val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
         replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
       }
-      // we may need to increment high watermark since ISR could be down to 1
+
       if (isNewLeader) {
         // construct the high watermark metadata for the new leader replica
         leaderReplica.convertHWToLocalOffsetMetadata()
+        // mark local replica as the leader after converting hw
+        leaderReplicaIdOpt = Some(localBrokerId)
         // reset log end offset for remote replicas
         assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
       }
+      // we may need to increment high watermark since ISR could be down to 1
       (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
     }
     // some delayed operations may be unblocked after HW changed


Mime
View raw message