Repository: kafka Updated Branches: refs/heads/0.10.1 d6b3ff142 -> 823c08067 KAFKA-4214; kafka-reassign-partitions fails all the time when brokers are bounced during reassignment There is a corner case bug, where during partition reassignment, if the controller and a broker receiving a new replica are bounced at the same time, the partition reassignment is failed. The cause of this bug is a block of code in the KafkaController which fails the reassignment if the aliveNewReplicas != newReplicas, ie. if some of the new replicas are offline at the time a controller fails over. The fix is to have the controller listen for ISR change events even for new replicas which are not alive when the controller boots up. Once the said replicas come online, they will be in the ISR set, and the new controller will detect this, and then mark the reassignment as successful. Interestingly, the block of code in question was introduced in KAFKA-990, where a concern about this exact scenario was raised :) This bug was revealed in the system tests in https://github.com/apache/kafka/pull/1904. The relevant tests will be enabled in either this or a followup PR when PR-1904 is merged. Thanks to junrao identifying the issue and providing the patch. Author: Apurva Mehta Reviewers: Jun Rao Closes #1910 from apurvam/KAFKA-4214 (cherry picked from commit be6056abc9970600347c95c4c8658799b76dbe6b) Signed-off-by: Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/823c0806 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/823c0806 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/823c0806 Branch: refs/heads/0.10.1 Commit: 823c08067c189791d97ca4bd0548ffa308684dd7 Parents: d6b3ff1 Author: Apurva Mehta Authored: Mon Sep 26 17:18:18 2016 -0700 Committer: Jun Rao Committed: Mon Sep 26 17:18:29 2016 -0700 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 21 +++++++------------- 1 file changed, 7 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/823c0806/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 04bd3f4..063ea6f 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -631,20 +631,13 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) } else { - if(aliveNewReplicas == newReplicas) { - info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) - // first register ISR change listener - watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) - controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) - // mark topic ineligible for deletion for the partitions being reassigned - deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) - onPartitionReassignment(topicAndPartition, reassignedPartitionContext) - } else { - // some replica in RAR is not alive. Fail partition reassignment - throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + - " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + - "Failing partition reassignment") - } + info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) + // first register ISR change listener + watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) + controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) + // mark topic ineligible for deletion for the partitions being reassigned + deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) + onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" .format(topicAndPartition))