kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4214; kafka-reassign-partitions fails all the time when brokers are bounced during reassignment
Date Tue, 27 Sep 2016 00:18:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 d6b3ff142 -> 823c08067


KAFKA-4214; kafka-reassign-partitions fails all the time when brokers are bounced during reassignment

There is a corner case bug, where during partition reassignment, if the
controller and a broker receiving a new replica are bounced at the same
time, the partition reassignment is failed.

The cause of this bug is a block of code in the KafkaController which
fails the reassignment if the aliveNewReplicas != newReplicas, ie. if
some of the new replicas are offline at the time a controller fails
over.

The fix is to have the controller listen for ISR change events even for
new replicas which are not alive when the controller boots up. Once the
said replicas come online, they will be in the ISR set, and the new
controller will detect this, and then mark the reassignment as
successful.

Interestingly, the block of code in question was introduced in
KAFKA-990, where a concern about this exact scenario was raised :)

This bug was revealed in the system tests in https://github.com/apache/kafka/pull/1904.
The relevant tests will be enabled in either this or a followup PR when PR-1904 is merged.

Thanks to junrao identifying the issue and providing the patch.

Author: Apurva Mehta <apurva.1618@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1910 from apurvam/KAFKA-4214

(cherry picked from commit be6056abc9970600347c95c4c8658799b76dbe6b)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/823c0806
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/823c0806
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/823c0806

Branch: refs/heads/0.10.1
Commit: 823c08067c189791d97ca4bd0548ffa308684dd7
Parents: d6b3ff1
Author: Apurva Mehta <apurva.1618@gmail.com>
Authored: Mon Sep 26 17:18:18 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Sep 26 17:18:29 2016 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 21 +++++++-------------
 1 file changed, 7 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/823c0806/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 04bd3f4..063ea6f 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -631,20 +631,13 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val
brokerStat
             throw new KafkaException("Partition %s to be reassigned is already assigned to
replicas".format(topicAndPartition) +
               " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
           } else {
-            if(aliveNewReplicas == newReplicas) {
-              info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
newReplicas.mkString(",")))
-              // first register ISR change listener
-              watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
-              controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
-              // mark topic ineligible for deletion for the partitions being reassigned
-              deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
-              onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-            } else {
-              // some replica in RAR is not alive. Fail partition reassignment
-              throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(","))
+
-                " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","),
topicAndPartition) +
-                "Failing partition reassignment")
-            }
+            info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
newReplicas.mkString(",")))
+            // first register ISR change listener
+            watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
+            controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+            // mark topic ineligible for deletion for the partitions being reassigned
+            deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
+            onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
           }
         case None => throw new KafkaException("Attempt to reassign partition %s that doesn't
exist"
           .format(topicAndPartition))


Mime
View raw message