kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Make LeaderAndIsr immutable case class
Date Wed, 12 Apr 2017 00:03:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 256f8d566 -> 1d25369d2


MINOR: Make LeaderAndIsr immutable case class

Also include a few code readability improvements.

Author: jozi-k <jozef.koval@protonmail.ch>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2731 from jozi-k/immutable_LeaderAndIsr


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d25369d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d25369d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d25369d

Branch: refs/heads/trunk
Commit: 1d25369d22f47567a7117dede14b62b8586f394a
Parents: 256f8d5
Author: jozi-k <jozef.koval@protonmail.ch>
Authored: Wed Apr 12 00:57:08 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Apr 12 01:01:59 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |  25 ++--
 .../controller/ControllerChannelManager.scala   |  18 +--
 .../kafka/controller/KafkaController.scala      |  32 ++---
 .../controller/PartitionLeaderSelector.scala    | 117 +++++++++----------
 .../controller/PartitionStateMachine.scala      |  61 +++++-----
 .../ControlledShutdownLeaderSelectorTest.scala  |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  36 +++---
 7 files changed, 152 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d25369d/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index e68ad86..8c3b7e5 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -25,12 +25,25 @@ import scala.collection.Set
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0
-  val NoLeader = -1
-  val LeaderDuringDelete = -2
+  val NoLeader: Int = -1
+  val LeaderDuringDelete: Int = -2
+
+  def apply(leader: Int, isr: List[Int]): LeaderAndIsr = LeaderAndIsr(leader, initialLeaderEpoch,
isr, initialZKVersion)
+
+  def duringDelete(isr: List[Int]): LeaderAndIsr = LeaderAndIsr(LeaderDuringDelete, isr)
 }
 
-case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion:
Int) {
-  def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr,
LeaderAndIsr.initialZKVersion)
+case class LeaderAndIsr(leader: Int,
+                        leaderEpoch: Int,
+                        isr: List[Int],
+                        zkVersion: Int) {
+  def withZkVersion(zkVersion: Int) = copy(zkVersion = zkVersion)
+
+  def newLeader(leader: Int) = newLeaderAndIsr(leader, isr)
+
+  def newLeaderAndIsr(leader: Int, isr: List[Int]) = LeaderAndIsr(leader, leaderEpoch + 1,
isr, zkVersion + 1)
+
+  def newEpochAndZkVersion = newLeaderAndIsr(leader, isr)
 
   override def toString: String = {
     Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" ->
isr))
@@ -39,12 +52,10 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr:
List[Int
 
 case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas:
Set[Int]) {
 
-  def replicationFactor = allReplicas.size
-
   override def toString: String = {
     val partitionStateInfo = new StringBuilder
     partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
-    partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
+    partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
     partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
     partitionStateInfo.toString()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d25369d/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 194cfcc..102ebd8 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -311,20 +311,24 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
 
   /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions
that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
-                                         partitions: collection.Set[TopicAndPartition] =
Set.empty[TopicAndPartition],
-                                         callback: AbstractResponse => Unit = null) {
+                                         partitions: collection.Set[TopicAndPartition] =
Set.empty[TopicAndPartition]) {
+
     def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean)
{
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
       leaderIsrAndControllerEpochOpt match {
-        case Some(leaderIsrAndControllerEpoch) =>
+        case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) =>
           val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
-          val partitionStateInfo = if (beingDeleted) {
-            val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
-            PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch),
replicas)
+
+          val leaderIsrAndControllerEpoch = if (beingDeleted) {
+            val leaderDuringDelete = LeaderAndIsr.duringDelete(leaderAndIsr.isr)
+            LeaderIsrAndControllerEpoch(leaderDuringDelete, controllerEpoch)
           } else {
-            PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
+            l
           }
+
+          val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
           updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition),
partitionStateInfo)
+
         case None =>
           info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d25369d/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 774316b..97ad198 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1081,21 +1081,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
               newIsr = leaderAndIsr.isr
             }
 
-            val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch +
1,
-              newIsr, leaderAndIsr.zkVersion + 1)
+            val newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
             // update the new leadership decision in zookeeper or retry
             val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
topic, partition,
               newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
 
-            newLeaderAndIsr.zkVersion = newVersion
-            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr,
epoch))
+            val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
+            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion,
epoch))
             controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
-            if (updateSucceeded)
-              info("New leader and ISR for partition %s is %s".format(topicAndPartition,
newLeaderAndIsr.toString()))
+            if (updateSucceeded) {
+              info(s"New leader and ISR for partition $topicAndPartition is $leaderWithNewVersion")
+            }
             updateSucceeded
           } else {
-            warn("Cannot remove replica %d from ISR of partition %s since it is not in the
ISR. Leader = %d ; ISR = %s"
-                 .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
+            warn(s"Cannot remove replica $replicaId from ISR of partition $topicAndPartition
since it is not in the ISR." +
+              s" Leader = ${leaderAndIsr.leader} ; ISR = ${leaderAndIsr.isr}")
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr,
epoch))
             controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             true
@@ -1133,20 +1133,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
               "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
           // increment the leader epoch even if there are no leader or isr changes to allow
the leader to cache the expanded
           // assigned replica list
-          val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch
+ 1,
-                                                 leaderAndIsr.isr, leaderAndIsr.zkVersion
+ 1)
+          val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
           // update the new leadership decision in zookeeper or retry
           val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
topic,
             partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
 
-          newLeaderAndIsr.zkVersion = newVersion
-          finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr,
epoch))
-          if (updateSucceeded)
-            info("Updated leader epoch for partition %s to %d".format(topicAndPartition,
newLeaderAndIsr.leaderEpoch))
+          val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
+          finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion,
epoch))
+          if (updateSucceeded) {
+            info(s"Updated leader epoch for partition $topicAndPartition to ${leaderWithNewVersion.leaderEpoch}")
+          }
           updateSucceeded
         case None =>
-          throw new IllegalStateException(("Cannot update leader epoch for partition %s as
leaderAndIsr path is empty. " +
-            "This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition))
+          throw new IllegalStateException(s"Cannot update leader epoch for partition $topicAndPartition
as " +
+            "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition
that doesn't exist")
           true
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d25369d/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 8171914..8d99fe2 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -48,49 +48,49 @@ trait PartitionLeaderSelector {
  */
 class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
   extends PartitionLeaderSelector with Logging {
-  this.logIdent = "[OfflinePartitionLeaderSelector]: "
+
+  logIdent = "[OfflinePartitionLeaderSelector]: "
 
   def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr):
(LeaderAndIsr, Seq[Int]) = {
     controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
       case Some(assignedReplicas) =>
         val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
         val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
-        val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
-        val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
         val newLeaderAndIsr =
           if (liveBrokersInIsr.isEmpty) {
             // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is
not disallowed by the configuration
             // for unclean leader election.
             if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
               ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
-              throw new NoReplicaOnlineException(("No broker in ISR for partition " +
-                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds))
+
-                " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
+              throw new NoReplicaOnlineException(
+                s"No broker in ISR for partition $topicAndPartition is alive. Live brokers
are: [${controllerContext.liveBrokerIds}], " +
+                  s"ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]"
+              )
             }
-            debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned
replicas: %s"
-              .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
+            debug(s"No broker in ISR is alive for $topicAndPartition. Pick the leader from
the alive assigned " +
+              s"replicas: ${liveAssignedReplicas.mkString(",")}")
+
             if (liveAssignedReplicas.isEmpty) {
-              throw new NoReplicaOnlineException(("No replica for partition " +
-                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds))
+
-                " Assigned replicas are: [%s]".format(assignedReplicas))
+              throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition
is alive. Live " +
+                s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are:
[$assignedReplicas].")
             } else {
               ControllerStats.uncleanLeaderElectionRate.mark()
               val newLeader = liveAssignedReplicas.head
-              warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s.
There's potential data loss."
-                .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
-              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion
+ 1)
+              warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader
from live " +
+                s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.")
+              currentLeaderAndIsr.newLeaderAndIsr(newLeader, List(newLeader))
             }
           } else {
             val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
             val newLeader = liveReplicasInIsr.head
-            debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
-              .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
-            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList,
currentLeaderIsrZkPathVersion + 1)
+            debug(s"Some broker in ISR is alive for $topicAndPartition. Select $newLeader
from ISR " +
+              s"${liveBrokersInIsr.mkString(",")} to be the leader.")
+            currentLeaderAndIsr.newLeaderAndIsr(newLeader, liveBrokersInIsr)
           }
-        info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(),
topicAndPartition))
+        info(s"Selected new leader and ISR $newLeaderAndIsr for offline partition $topicAndPartition")
         (newLeaderAndIsr, liveAssignedReplicas)
       case None =>
-        throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to
it".format(topicAndPartition))
+        throw new NoReplicaOnlineException(s"Partition $topicAndPartition doesn't have replicas
assigned to it")
     }
   }
 }
@@ -101,30 +101,29 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext,
confi
  * Replicas to receive LeaderAndIsr request = reassigned replicas
  */
 class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
-  this.logIdent = "[ReassignedPartitionLeaderSelector]: "
+
+  logIdent = "[ReassignedPartitionLeaderSelector]: "
 
   /**
    * The reassigned replicas are already in the ISR when selectLeader is called.
    */
-  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr):
(LeaderAndIsr, Seq[Int]) = {
+  def selectLeader(topicAndPartition: TopicAndPartition,
+                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
-    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
-    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-    val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)
&&
-                                                                             currentLeaderAndIsr.isr.contains(r))
-    val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
+    val newLeaderOpt = reassignedInSyncReplicas.find { r =>
+      controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)
+    }
     newLeaderOpt match {
-      case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
-        currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
+      case Some(newLeader) => (currentLeaderAndIsr.newLeader(newLeader), reassignedInSyncReplicas)
       case None =>
-        reassignedInSyncReplicas.size match {
-          case 0 =>
-            throw new NoReplicaOnlineException("List of reassigned replicas for partition
" +
-              " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
-          case _ =>
-            throw new NoReplicaOnlineException("None of the reassigned replicas for partition
" +
-              "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition,
currentLeaderAndIsr))
+        val errorMessage = if (reassignedInSyncReplicas.isEmpty) {
+          s"List of reassigned replicas for partition $topicAndPartition is empty. Current
leader and ISR: " +
+            s"[$currentLeaderAndIsr]"
+        } else {
+          s"None of the reassigned replicas for partition $topicAndPartition are in-sync
with the leader. " +
+            s"Current leader and ISR: [$currentLeaderAndIsr]"
         }
+        throw new NoReplicaOnlineException(errorMessage)
     }
   }
 }
@@ -134,11 +133,12 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext)
ex
  * New isr = current isr;
  * Replicas to receive LeaderAndIsr request = assigned replicas
  */
-class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends
PartitionLeaderSelector
-with Logging {
-  this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
+class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends
PartitionLeaderSelector with Logging {
 
-  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr):
(LeaderAndIsr, Seq[Int]) = {
+  logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
+
+  def selectLeader(topicAndPartition: TopicAndPartition,
+                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val preferredReplica = assignedReplicas.head
     // check if preferred replica is the current leader
@@ -151,11 +151,11 @@ with Logging {
         " Triggering preferred replica leader election")
       // check if preferred replica is not the current leader and is alive and in the isr
       if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica))
{
-        (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
-          currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
+        val newLeaderAndIsr = currentLeaderAndIsr.newLeader(preferredReplica)
+        (newLeaderAndIsr, assignedReplicas)
       } else {
-        throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica)
+
-          "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition,
currentLeaderAndIsr))
+        throw new StateChangeFailedException(s"Preferred replica $preferredReplica for partition
$topicAndPartition " +
+          s"is either not alive or not in the isr. Current leader and ISR: [$currentLeaderAndIsr]")
       }
     }
   }
@@ -166,30 +166,26 @@ with Logging {
  * New isr = current isr - shutdown replica;
  * Replicas to receive LeaderAndIsr request = live assigned replicas
  */
-class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
-        extends PartitionLeaderSelector
-        with Logging {
-
-  this.logIdent = "[ControlledShutdownLeaderSelector]: "
-
-  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr):
(LeaderAndIsr, Seq[Int]) = {
-    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
-    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
 
-    val currentLeader = currentLeaderAndIsr.leader
+  logIdent = "[ControlledShutdownLeaderSelector]: "
 
+  def selectLeader(topicAndPartition: TopicAndPartition,
+                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    val currentIsr = currentLeaderAndIsr.isr
     val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
     val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
 
-    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
+    val newIsr = currentIsr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
     liveAssignedReplicas.find(newIsr.contains) match {
       case Some(newLeader) =>
-        debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition,
currentLeader, newLeader))
-        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion
+ 1), liveAssignedReplicas)
+        debug(s"Partition $topicAndPartition : current leader = ${currentLeaderAndIsr.leader},
new leader = $newLeader")
+        val newLeaderAndIsr = currentLeaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
+        (newLeaderAndIsr, liveAssignedReplicas)
       case None =>
-        throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides"
+
-          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition,
controllerContext.shuttingDownBrokerIds.mkString(",")))
+        throw new StateChangeFailedException(s"No other replicas in ISR ${currentIsr.mkString(",")}
for $topicAndPartition " +
+          s"besides shutting down brokers ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
     }
   }
 }
@@ -200,9 +196,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
  */
 class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
 
-  this.logIdent = "[NoOpLeaderSelector]: "
+  logIdent = "[NoOpLeaderSelector]: "
 
-  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr):
(LeaderAndIsr, Seq[Int]) = {
+  def selectLeader(topicAndPartition: TopicAndPartition,
+                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     warn("I should never have been asked to perform leader election, returning the current
LeaderAndIsr and replica assignment.")
     (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d25369d/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c0b94b1..0eba51a 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -266,42 +266,52 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
    * OfflinePartition state.
    * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
    */
-  private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
-    val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
-    val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
-    liveAssignedReplicas.size match {
-      case 0 =>
-        val failMsg = ("encountered error during state change of partition %s from New to
Online, assigned replicas are [%s], " +
-                       "live brokers are [%s]. No assigned replica is alive.")
-                         .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
-        stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch)
+ failMsg)
+  private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) =
{
+    val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition).toList
+    val liveAssignedReplicas = replicaAssignment.filter(controllerContext.liveBrokerIds.contains)
+    liveAssignedReplicas.headOption match {
+      case None =>
+        val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered error
during state change of " +
+          s"partition $topicAndPartition from New to Online, assigned replicas are " +
+          s"[${replicaAssignment.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}].
No assigned " +
+          "replica is alive."
+
+        stateChangeLogger.error(failMsg)
         throw new StateChangeFailedException(failMsg)
-      case _ =>
-        debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition,
liveAssignedReplicas))
-        // make the first replica in the list of assigned replicas, the leader
-        val leader = liveAssignedReplicas.head
-        val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader,
liveAssignedReplicas.toList),
-          controller.epoch)
-        debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition,
leaderIsrAndControllerEpoch))
+
+      // leader is the first replica in the list of assigned replicas
+      case Some(leader) =>
+        debug(s"Live assigned replicas for partition $topicAndPartition are: [$liveAssignedReplicas]")
+        val leaderAndIsr = LeaderAndIsr(leader, liveAssignedReplicas)
+        val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controller.epoch)
+        debug(s"Initializing leader and isr for partition $topicAndPartition to $leaderIsrAndControllerEpoch")
+
         try {
           zkUtils.createPersistentPath(
             getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
-            zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
+            zkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch)
+          )
           // NOTE: the above write can fail only if the current controller lost its zk session
and the new controller
           // took over and initialized this partition. This can happen if the current controller
went into a long
           // GC pause
           controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
-            topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
+            liveAssignedReplicas,
+            topicAndPartition.topic,
+            topicAndPartition.partition,
+            leaderIsrAndControllerEpoch,
+            replicaAssignment
+          )
         } catch {
           case _: ZkNodeExistsException =>
             // read the controller epoch
             val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils,
topicAndPartition.topic,
               topicAndPartition.partition).get
-            val failMsg = ("encountered error while changing partition %s's state from New
to Online since LeaderAndIsr path already " +
-                           "exists with value %s and controller epoch %d")
-                             .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(),
leaderIsrAndEpoch.controllerEpoch)
-            stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch)
+ failMsg)
+
+            val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered
error while changing " +
+              s"partition $topicAndPartition's state from New to Online since LeaderAndIsr
path already exists with " +
+              s"value ${leaderIsrAndEpoch.leaderAndIsr} and controller epoch ${leaderIsrAndEpoch.controllerEpoch}"
+            stateChangeLogger.error(failMsg)
             throw new StateChangeFailedException(failMsg)
         }
     }
@@ -339,12 +349,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
         val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
topic, partition,
           leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
-        newLeaderAndIsr = leaderAndIsr
-        newLeaderAndIsr.zkVersion = newVersion
+        newLeaderAndIsr = leaderAndIsr.withZkVersion(newVersion)
         zookeeperPathUpdateSucceeded = updateSucceeded
         replicasForThisPartition = replicas
       }
-      val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr,
controller.epoch)
+      val newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
       controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition),
newLeaderIsrAndControllerEpoch)
       stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition
%s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d25369d/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
index 47a05ef..4b90767 100644
--- a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
@@ -45,7 +45,7 @@ class ControlledShutdownLeaderSelectorTest {
     controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)
 
     val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-    val firstLeaderAndIsr = new LeaderAndIsr(firstLeader, firstIsr)
+    val firstLeaderAndIsr = LeaderAndIsr(firstLeader, firstIsr)
     val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition,
firstLeaderAndIsr)
 
     assertEquals(preferredReplicaId, secondLeaderAndIsr.leader)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d25369d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3dbe2de..cd9f0b1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -700,30 +700,22 @@ object TestUtils extends Logging {
     new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
   }
 
-  def makeLeaderForPartition(zkUtils: ZkUtils, topic: String,
+  def makeLeaderForPartition(zkUtils: ZkUtils,
+                             topic: String,
                              leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
                              controllerEpoch: Int) {
-    leaderPerPartitionMap.foreach
-    {
-      leaderForPartition => {
-        val partition = leaderForPartition._1
-        val leader = leaderForPartition._2
-        try{
-          val currentLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
-          var newLeaderAndIsr: LeaderAndIsr = null
-          if(currentLeaderAndIsrOpt.isEmpty)
-            newLeaderAndIsr = new LeaderAndIsr(leader, List(leader))
-          else{
-            newLeaderAndIsr = currentLeaderAndIsrOpt.get
-            newLeaderAndIsr.leader = leader
-            newLeaderAndIsr.leaderEpoch += 1
-            newLeaderAndIsr.zkVersion += 1
-          }
-          zkUtils.updatePersistentPath(getTopicPartitionLeaderAndIsrPath(topic, partition),
-            zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
-        } catch {
-          case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic,
partition), oe)
-        }
+    leaderPerPartitionMap.foreach { case (partition, leader) =>
+      try {
+        val newLeaderAndIsr = zkUtils.getLeaderAndIsrForPartition(topic, partition)
+          .map(_.newLeader(leader))
+          .getOrElse(LeaderAndIsr(leader, List(leader)))
+
+        zkUtils.updatePersistentPath(
+          getTopicPartitionLeaderAndIsrPath(topic, partition),
+          zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
+        )
+      } catch {
+        case oe: Throwable => error(s"Error while electing leader for partition [$topic,$partition]",
oe)
       }
     }
   }


Mime
View raw message