kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4214; kafka-reassign-partitions fails all the time when brokers are bounced during reassignment
Date Tue, 27 Sep 2016 00:18:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b75245cfb -> be6056abc


KAFKA-4214; kafka-reassign-partitions fails all the time when brokers are bounced during reassignment

There is a corner case bug, where during partition reassignment, if the
controller and a broker receiving a new replica are bounced at the same
time, the partition reassignment is failed.

The cause of this bug is a block of code in the KafkaController which
fails the reassignment if the aliveNewReplicas != newReplicas, ie. if
some of the new replicas are offline at the time a controller fails
over.

The fix is to have the controller listen for ISR change events even for
new replicas which are not alive when the controller boots up. Once the
said replicas come online, they will be in the ISR set, and the new
controller will detect this, and then mark the reassignment as
successful.

Interestingly, the block of code in question was introduced in
KAFKA-990, where a concern about this exact scenario was raised :)

This bug was revealed in the system tests in https://github.com/apache/kafka/pull/1904.
The relevant tests will be enabled in either this or a followup PR when PR-1904 is merged.

Thanks to junrao identifying the issue and providing the patch.

Author: Apurva Mehta <apurva.1618@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1910 from apurvam/KAFKA-4214


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be6056ab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be6056ab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be6056ab

Branch: refs/heads/trunk
Commit: be6056abc9970600347c95c4c8658799b76dbe6b
Parents: b75245c
Author: Apurva Mehta <apurva.1618@gmail.com>
Authored: Mon Sep 26 17:18:18 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Sep 26 17:18:18 2016 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 21 +++++++-------------
 1 file changed, 7 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be6056ab/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 04bd3f4..063ea6f 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -631,20 +631,13 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val
brokerStat
             throw new KafkaException("Partition %s to be reassigned is already assigned to
replicas".format(topicAndPartition) +
               " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
           } else {
-            if(aliveNewReplicas == newReplicas) {
-              info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
newReplicas.mkString(",")))
-              // first register ISR change listener
-              watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
-              controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
-              // mark topic ineligible for deletion for the partitions being reassigned
-              deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
-              onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-            } else {
-              // some replica in RAR is not alive. Fail partition reassignment
-              throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(","))
+
-                " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","),
topicAndPartition) +
-                "Failing partition reassignment")
-            }
+            info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
newReplicas.mkString(",")))
+            // first register ISR change listener
+            watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
+            controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+            // mark topic ineligible for deletion for the partitions being reassigned
+            deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
+            onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
           }
         case None => throw new KafkaException("Attempt to reassign partition %s that doesn't
exist"
           .format(topicAndPartition))


Mime
View raw message