This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new c7131fc KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker
(#5745)
c7131fc is described below
commit c7131fc7a3efcc338321f72de1f159f1f06cfcfb
Author: Jun Rao <junrao@gmail.com>
AuthorDate: Fri Oct 12 10:11:54 2018 -0700
KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745)
Reviewers: Dong Lin <lindong28@gmail.com>
---
core/src/main/scala/kafka/controller/PartitionStateMachine.scala | 6 +++---
.../scala/unit/kafka/controller/PartitionStateMachineTest.scala | 5 +++--
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 663ee8d..e4f0532 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -441,12 +441,12 @@ class PartitionStateMachine(config: KafkaConfig,
Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
val assignment = controllerContext.partitionReplicaAssignment(partition)
- val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica,
partition))
+ val liveOrShuttingDownReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica,
partition, includeShuttingDownBrokers = true))
val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
- val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
isr, liveReplicas.toSet, shuttingDownBrokers)
+ val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokers)
val newIsr = isr.filter(replica => !controllerContext.shuttingDownBrokerIds.contains(replica))
val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader,
newIsr))
- (partition, newLeaderAndIsrOpt, liveReplicas)
+ (partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)
}
}
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index b89632e..3370b54 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -192,7 +192,9 @@ class PartitionStateMachineTest extends JUnitSuite {
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection),
controllerEpoch, controllerContext.epochZkVersion))
.andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty,
Map.empty))
- EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
+
+ // The leaderAndIsr request should be sent to both brokers, including the shutting down
one
+ EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId,
otherBrokerId),
partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId,
otherBrokerId),
isNew = false))
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
@@ -455,5 +457,4 @@ class PartitionStateMachineTest extends JUnitSuite {
topicDeletionManager.enqueueTopicsForDeletion(Set(topic))
assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount)
}
-
}
|