kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1396814 - in /incubator/kafka/branches/0.8: bin/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/controller/
Date Wed, 10 Oct 2012 21:43:45 GMT
Author: nehanarkhede
Date: Wed Oct 10 21:43:44 2012
New Revision: 1396814

URL: http://svn.apache.org/viewvc?rev=1396814&view=rev
Log:
KAFKA-42 part 2: new files

Added:
    incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh   (with props)
    incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh   (with props)
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala

Added: incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh?rev=1396814&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh Wed Oct 10 21:43:44
2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@

Propchange: incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh?rev=1396814&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh Wed Oct 10 21:43:44 2012
@@ -0,0 +1,19 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@

Propchange: incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala?rev=1396814&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
Wed Oct 10 21:43:44 2012
@@ -0,0 +1,97 @@
+
+package kafka.admin
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import scala.collection.Map
+
+object CheckReassignmentStatus extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val jsonFileOpt = parser.accepts("path to json file", "REQUIRED: The JSON file with the
list of partitions and the " +
+      "new replicas they should be reassigned to")
+      .withRequiredArg
+      .describedAs("partition reassignment json file path")
+      .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the " +
+      "form host:port. Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(jsonFileOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val jsonFile = options.valueOf(jsonFileOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val jsonString = Utils.readFileIntoString(jsonFile)
+    val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+
+    try {
+      // read the json file into a string
+      val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
+        case Some(reassignedPartitions) =>
+          val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
+          partitions.map { m =>
+            val topic = m.asInstanceOf[Map[String, String]].get("topic").get
+            val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
+            val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
+            val newReplicas = replicasList.split(",").map(_.toInt)
+            ((topic, partition), newReplicas.toSeq)
+          }.toMap
+        case None => Map.empty[(String, Int), Seq[Int]]
+      }
+
+      val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+      reassignedPartitionsStatus.foreach { partition =>
+        partition._2 match {
+          case ReassignmentCompleted =>
+            println("Partition [%s,%d] reassignment to %s completed successfully".format(partition._1,
partition._2,
+            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+          case ReassignmentFailed =>
+            println("Partition [%s,%d] reassignment to %s failed".format(partition._1, partition._2,
+            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+          case ReassignmentInProgress =>
+            println("Partition [%s,%d] reassignment to %s in progress".format(partition._1,
partition._2,
+            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+        }
+      }
+    }
+  }
+
+  def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[(String,
Int), Seq[Int]])
+  :Map[(String, Int), ReassignmentStatus] = {
+    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
+    // for all partitions whose replica reassignment is complete, check the status
+    partitionsToBeReassigned.map { topicAndPartition =>
+      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1._1,
topicAndPartition._1._2,
+        topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
+    }
+  }
+
+  def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topic: String, partition:
Int,
+                                            reassignedReplicas: Seq[Int],
+                                            partitionsToBeReassigned: Map[(String, Int),
Seq[Int]],
+                                            partitionsBeingReassigned: Map[(String, Int),
Seq[Int]]): ReassignmentStatus = {
+    val newReplicas = partitionsToBeReassigned((topic, partition))
+    partitionsBeingReassigned.get((topic, partition)) match {
+      case Some(partition) => ReassignmentInProgress
+      case None =>
+        // check if AR == RAR
+        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition)
+        if(assignedReplicas == newReplicas)
+          ReassignmentCompleted
+        else
+          ReassignmentFailed
+    }
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala?rev=1396814&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Wed Oct 10 21:43:44 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import kafka.common.AdminCommandFailedException
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+
+object ReassignPartitionsCommand extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val jsonFileOpt = parser.accepts("path to json file", "REQUIRED: The JSON file with the
list of partitions and the " +
+      "new replicas they should be reassigned to in the following format - \n" +
+       "[{\"topic\": \"foo\", \"partition\": \"1\", \"replicas\": \"1,2,3\" }]")
+      .withRequiredArg
+      .describedAs("partition reassignment json file path")
+      .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the " +
+      "form host:port. Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(jsonFileOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val jsonFile = options.valueOf(jsonFileOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val jsonString = Utils.readFileIntoString(jsonFile)
+    var zkClient: ZkClient = null
+
+    try {
+      // read the json file into a string
+      val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
+        case Some(reassignedPartitions) =>
+          val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
+          partitions.map { m =>
+            val topic = m.asInstanceOf[Map[String, String]].get("topic").get
+            val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
+            val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
+            val newReplicas = replicasList.split(",").map(_.toInt)
+            ((topic, partition), newReplicas.toSeq)
+          }.toMap
+        case None => throw new AdminCommandFailedException("Partition reassignment data
file %s is empty".format(jsonFile))
+      }
+
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+
+      // attach shutdown handler to catch control-c
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        override def run() = {
+          // delete the admin path so it can be retried
+          ZkUtils.deletePathRecursive(zkClient, ZkUtils.ReassignPartitionsPath)
+        }
+      })
+
+      if(reassignPartitionsCommand.reassignPartitions())
+        println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+      else
+        println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
+    } catch {
+      case e =>
+        println("Partitions reassignment failed due to " + e.getMessage)
+        println(Utils.stackTrace(e))
+    } finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+}
+
+class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[(String,
Int), Seq[Int]])
+  extends Logging {
+  def reassignPartitions(): Boolean = {
+    try {
+      val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1._1,
p._1._2))
+      val jsonReassignmentData = Utils.mapToJson(validPartitions.map(p =>
+        ("%s,%s".format(p._1._1, p._1._2)) -> p._2.map(_.toString)))
+      ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
+      true
+    }catch {
+      case ze: ZkNodeExistsException =>
+        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+        throw new AdminCommandFailedException("Partition reassignment currently in " +
+        "progress for %s. Aborting operation".format(partitionsBeingReassigned))
+      case e => error("Admin command failed", e); false
+    }
+  }
+
+  def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+    // check if partition exists
+    val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+    partitionsOpt match {
+      case Some(partitions) =>
+        if(partitions.contains(partition)) {
+          true
+        }else{
+          error("Skipping reassignment of partition [%s,%d] ".format(topic, partition) +
+            "since it doesn't exist")
+          false
+        }
+      case None => error("Skipping reassignment of partition " +
+        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
+        false
+    }
+  }
+}
+
+sealed trait ReassignmentStatus { def status: Int }
+case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 }
+case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 }
+case object ReassignmentFailed extends ReassignmentStatus { val status = -1 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/AdminCommandFailedException.scala?rev=1396814&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/AdminCommandFailedException.scala
Wed Oct 10 21:43:44 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class AdminCommandFailedException(message: String, cause: Throwable) extends RuntimeException(message,
cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala?rev=1396814&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
Wed Oct 10 21:43:44 2012
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.utils.Logging
+import kafka.common.{StateChangeFailedException, PartitionOfflineException}
+
+trait PartitionLeaderSelector {
+
+  /**
+   * @param topic                      The topic of the partition whose leader needs to be
elected
+   * @param partition                  The partition whose leader needs to be elected
+   * @param assignedReplicas           The list of replicas assigned to the input partition
+   * @param currentLeaderAndIsr        The current leader and isr of input partition read
from zookeeper
+   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+   * @returns The leader and isr request, with the newly selected leader info, to send to
the brokers
+   * Also, returns the list of replicas the returned leader and isr request should be sent
to
+   * This API selects a new leader for the input partition
+   */
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr,
Seq[Int])
+
+}
+
+/**
+ * This API selects a new leader for the input partition -
+ * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the
new leader
+ * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+ * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+ * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
+ */
+class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
+
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr,
Seq[Int]) = {
+    controllerContext.partitionReplicaAssignment.get((topic, partition)) match {
+      case Some(assignedReplicas) =>
+        val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
+        val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+        val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+        debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d],
[%s], [%d]"
+          .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+          currentLeaderIsrZkPathVersion))
+        val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
+          case true =>
+            debug("No broker is ISR is alive, picking the leader from the alive assigned
replicas: %s"
+              .format(liveAssignedReplicasToThisPartition.mkString(",")))
+            liveAssignedReplicasToThisPartition.isEmpty match {
+              case true =>
+                ControllerStat.offlinePartitionRate.mark()
+                throw new PartitionOfflineException(("No replica for partition " +
+                  "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition,
controllerContext.liveBrokerIds)) +
+                  " Assigned replicas are: [%s]".format(assignedReplicas))
+              case false =>
+                ControllerStat.uncleanLeaderElectionRate.mark()
+                val newLeader = liveAssignedReplicasToThisPartition.head
+                warn("No broker in ISR is alive, elected leader from the alive replicas is
[%s], ".format(newLeader) +
+                  "There's potential data loss")
+                new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion
+ 1)
+            }
+          case false =>
+            val newLeader = liveBrokersInIsr.head
+            debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
+            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList,
currentLeaderIsrZkPathVersion + 1)
+        }
+        info("Selected new leader and ISR %s for offline partition [%s, %d]".format(newLeaderAndIsr.toString(),
topic,
+          partition))
+        (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
+      case None =>
+        ControllerStat.offlinePartitionRate.mark()
+        throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic,
partition) +
+                                            "replicas assigned to it")
+    }
+  }
+}
+
+/**
+ * Picks one of the alive in-sync reassigned replicas as the new leader
+ */
+class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
+
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr,
Seq[Int]) = {
+    val reassignedReplicas = controllerContext.partitionsBeingReassigned((topic, partition)).newReplicas
+    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+    debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s],
[%d]"
+      .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+      currentLeaderIsrZkPathVersion))
+    // pick any replica from the newly assigned replicas list that is in the ISR
+    val aliveReassignedReplicas = reassignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+    val newLeaderOpt = aliveReassignedReplicas.headOption
+    newLeaderOpt match {
+      case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
+                              currentLeaderIsrZkPathVersion + 1), reassignedReplicas)
+      case None =>
+        reassignedReplicas.size match {
+          case 0 =>
+            throw new StateChangeFailedException("List of reassigned replicas for partition
" +
+              "([%s, %d]) is empty. Current leader and ISR: [%s]".format(topic, partition,
currentLeaderAndIsr))
+          case _ =>
+            throw new StateChangeFailedException("None of the reassigned replicas for partition
" +
+              "([%s, %d]) are alive. Current leader and ISR: [%s]".format(topic, partition,
currentLeaderAndIsr))
+        }
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message