kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745)
Date Fri, 12 Oct 2018 17:18:40 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c7131fc  KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker
(#5745)
c7131fc is described below

commit c7131fc7a3efcc338321f72de1f159f1f06cfcfb
Author: Jun Rao <junrao@gmail.com>
AuthorDate: Fri Oct 12 10:11:54 2018 -0700

    KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745)
    
    Reviewers: Dong Lin <lindong28@gmail.com>
---
 core/src/main/scala/kafka/controller/PartitionStateMachine.scala    | 6 +++---
 .../scala/unit/kafka/controller/PartitionStateMachineTest.scala     | 5 +++--
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 663ee8d..e4f0532 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -441,12 +441,12 @@ class PartitionStateMachine(config: KafkaConfig,
   Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
     leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val assignment = controllerContext.partitionReplicaAssignment(partition)
-      val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica,
partition))
+      val liveOrShuttingDownReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica,
partition, includeShuttingDownBrokers = true))
       val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-      val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
isr, liveReplicas.toSet, shuttingDownBrokers)
+      val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokers)
       val newIsr = isr.filter(replica => !controllerContext.shuttingDownBrokerIds.contains(replica))
       val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader,
newIsr))
-      (partition, newLeaderAndIsrOpt, liveReplicas)
+      (partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index b89632e..3370b54 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -192,7 +192,9 @@ class PartitionStateMachineTest extends JUnitSuite {
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection),
controllerEpoch, controllerContext.epochZkVersion))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty,
Map.empty))
-    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
+
+    // The leaderAndIsr request should be sent to both brokers, including the shutting down
one
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId,
otherBrokerId),
       partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId,
otherBrokerId),
       isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
@@ -455,5 +457,4 @@ class PartitionStateMachineTest extends JUnitSuite {
     topicDeletionManager.enqueueTopicsForDeletion(Set(topic))
     assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount)
   }
-
 }


Mime
View raw message