kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2721; Avoid handling duplicate LeaderAndISR requests
Date Mon, 16 Nov 2015 23:50:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 99d9ddc8e -> 21ea9cbc0


KAFKA-2721; Avoid handling duplicate LeaderAndISR requests

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com?

Closes #436 from lindong28/KAFKA-2721

(cherry picked from commit 6df9e7ff2c6cfb3c7ca16f94928d0e86f3d087e2)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21ea9cbc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21ea9cbc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21ea9cbc

Branch: refs/heads/0.9.0
Commit: 21ea9cbc0de08c303a9d12b2d36e2f7ee38ff113
Parents: 99d9ddc
Author: Dong Lin <lindong28@gmail.com>
Authored: Mon Nov 16 15:50:46 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Nov 16 15:50:52 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 39 +++++++++++---------
 .../scala/kafka/server/ReplicaManager.scala     | 29 ++++++++++-----
 2 files changed, 42 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/21ea9cbc/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 70c8d99..3805dcc 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -157,12 +157,12 @@ class Partition(val topic: String,
   }
 
   /**
-   * Make the local replica the leader by resetting LogEndOffset for remote replicas (there
could be old LogEndOffset from the time when this broker was the leader last time)
-   *  and setting the new leader and ISR
+   * Make the local replica the leader by resetting LogEndOffset for remote replicas (there
could be old LogEndOffset
+   * from the time when this broker was the leader last time) and setting the new leader
and ISR.
+   * If the leader replica id does not change, return false to indicate the replica manager.
    */
-  def makeLeader(controllerId: Int,
-                 partitionStateInfo: PartitionStateInfo, correlationId: Int) {
-    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
+  def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId:
Int): Boolean = {
+    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -177,29 +177,34 @@ class Partition(val topic: String,
       inSyncReplicas = newInSyncReplicas
       leaderEpoch = leaderAndIsr.leaderEpoch
       zkVersion = leaderAndIsr.zkVersion
-      leaderReplicaIdOpt = Some(localBrokerId)
-      // construct the high watermark metadata for the new leader replica
-      val newLeaderReplica = getReplica().get
-      newLeaderReplica.convertHWToLocalOffsetMetadata()
-      // reset log end offset for remote replicas
-      assignedReplicas.foreach(r =>
-        if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult))
+      val isNewLeader =
+        if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId)
{
+          false
+        } else {
+          leaderReplicaIdOpt = Some(localBrokerId)
+          true
+        }
+      val leaderReplica = getReplica().get
       // we may need to increment high watermark since ISR could be down to 1
-      maybeIncrementLeaderHW(newLeaderReplica)
+      if (isNewLeader) {
+        // construct the high watermark metadata for the new leader replica
+        leaderReplica.convertHWToLocalOffsetMetadata()
+        // reset log end offset for remote replicas
+        assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
+      }
+      (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
     }
-
     // some delayed operations may be unblocked after HW changed
     if (leaderHWIncremented)
       tryCompleteDelayedRequests()
+    isNewLeader
   }
 
   /**
    *  Make the local replica the follower by setting the new leader and ISR to empty
    *  If the leader replica id does not change, return false to indicate the replica manager
    */
-  def makeFollower(controllerId: Int,
-                   partitionStateInfo: PartitionStateInfo,
-                   correlationId: Int): Boolean = {
+  def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId:
Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch

http://git-wip-us.apache.org/repos/asf/kafka/blob/21ea9cbc/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7823659..0dde914 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -618,6 +618,7 @@ class ReplicaManager(val config: KafkaConfig,
                 "epoch %d for partition [%s,%d] as itself is not in assigned replica list
%s")
                 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                 topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
+              responseMap.put((topic, partitionId), ErrorMapping.UnknownTopicOrPartitionCode)
             }
           } else {
             // Otherwise record the error code in response
@@ -663,7 +664,9 @@ class ReplicaManager(val config: KafkaConfig,
    * 3. Add these partitions to the leader partitions set
    *
    * If an unexpected error is thrown in this function, it will be propagated to KafkaApis
where
-   * the error message will be set on each partition since we do not know which partition
caused it
+   * the error message will be set on each partition since we do not know which partition
caused it. Otherwise,
+   * return the set of partitions that are made leader due to this method
+   *
    *  TODO: the above may need to be fixed later
    */
   private def makeLeaders(controllerId: Int,
@@ -679,18 +682,25 @@ class ReplicaManager(val config: KafkaConfig,
     for (partition <- partitionState.keys)
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
 
+    val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
+
     try {
       // First stop fetchers for all the partitions
       replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
-      partitionState.foreach { state =>
+      // Update the partition information to be the leader
+      partitionState.foreach{ case (partition, partitionStateInfo) =>
+        if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
+          partitionsToMakeLeaders += partition
+        else
+          stateChangeLogger.info(("Broker %d skipped the become-leader state change after
marking its partition as leader with correlation id %d from " +
+            "controller %d epoch %d for partition %s since it is already the leader for the
partition.")
+            .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(partition.topic,
partition.partitionId)));
+      }
+      partitionsToMakeLeaders.foreach { partition =>
         stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request
from controller " +
           "%d epoch %d with correlation id %d for partition %s")
-          .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic,
state._1.partitionId)))
+          .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic,
partition.partitionId)))
       }
-      // Update the partition information to be the leader
-      partitionState.foreach{ case (partition, partitionStateInfo) =>
-        partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
-
     } catch {
       case e: Throwable =>
         partitionState.foreach { state =>
@@ -709,7 +719,7 @@ class ReplicaManager(val config: KafkaConfig,
         .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic,
state._1.partitionId)))
     }
 
-    partitionState.keySet
+    partitionsToMakeLeaders
   }
 
   /*
@@ -726,7 +736,8 @@ class ReplicaManager(val config: KafkaConfig,
    * are guaranteed to be flushed to disks
    *
    * If an unexpected error is thrown in this function, it will be propagated to KafkaApis
where
-   * the error message will be set on each partition since we do not know which partition
caused it
+   * the error message will be set on each partition since we do not know which partition
caused it. Otherwise,
+   * return the set of partitions that are made follower due to this method
    */
   private def makeFollowers(controllerId: Int,
                             epoch: Int,


Mime
View raw message