kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1409618 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller: KafkaController.scala PartitionLeaderSelector.scala PartitionStateMachine.scala ReplicaStateMachine.scala
Date Thu, 15 Nov 2012 00:28:29 GMT
Author: nehanarkhede
Date: Thu Nov 15 00:28:28 2012
New Revision: 1409618

URL: http://svn.apache.org/viewvc?rev=1409618&view=rev
Log:
KAFKA-574 KafkaController unnecessarily reads leaderAndIsr info from ZK ; patched by Prashanth;
reviewed by Jun and Neha

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1409618&r1=1409617&r2=1409618&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
Thu Nov 15 00:28:28 2012
@@ -18,20 +18,20 @@ package kafka.controller
 
 import collection._
 import collection.immutable.Set
-import kafka.cluster.Broker
+import com.yammer.metrics.core.Gauge
+import java.lang.{IllegalStateException, Object}
+import java.util.concurrent.TimeUnit
+import kafka.admin.PreferredReplicaLeaderElectionCommand
 import kafka.api._
+import kafka.cluster.Broker
+import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
+import kafka.utils.{Utils, ZkUtils, Logging}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
-import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
-import kafka.utils.{Utils, ZkUtils, Logging}
 import org.I0Itec.zkclient.exception.ZkNoNodeException
-import java.lang.{IllegalStateException, Object}
-import kafka.admin.PreferredReplicaLeaderElectionCommand
-import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
@@ -40,7 +40,7 @@ class ControllerContext(val zkClient: Zk
                         val brokerShutdownLock: Object = new Object,
                         var allTopics: Set[String] = Set.empty,
                         var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]]
= mutable.Map.empty,
-                        var allLeaders: mutable.Map[TopicAndPartition, Int] = mutable.Map.empty,
+                        var allLeaders: mutable.Map[TopicAndPartition, LeaderAndIsr] = mutable.Map.empty,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext]
=
                           new mutable.HashMap,
                         var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition]
=
@@ -128,33 +128,32 @@ class KafkaController(val config : Kafka
         trace("All leaders = " + controllerContext.allLeaders.mkString(","))
         controllerContext.allLeaders.filter {
           case (topicAndPartition, leader) =>
-            leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size
> 1
+            leader.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size
> 1
         }.map(_._1)
       }
 
       val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
       debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
 
-      partitionsToMove.foreach(topicAndPartition => {
+      partitionsToMove.foreach{ topicAndPartition =>
         val (topic, partition) = topicAndPartition.asTuple
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
-          controllerContext.allLeaders.get(topicAndPartition).foreach(currLeader => {
-            if (currLeader == id) {
+          controllerContext.allLeaders.get(topicAndPartition).foreach{ currLeaderAndIsr =>
+            if (currLeaderAndIsr.leader == id) {
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                 controlledShutdownPartitionLeaderSelector)
-              val newLeader = controllerContext.allLeaders(topicAndPartition)
+              val newLeaderAndIsr = controllerContext.allLeaders(topicAndPartition)
 
               // mark replica offline only if leadership was moved successfully
-              if (newLeader != currLeader)
+              if (newLeaderAndIsr.leader != currLeaderAndIsr.leader)
                 replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition,
id)), OfflineReplica)
-            }
-            else
+            } else
               debug("Partition %s moved from leader %d to new leader %d during shutdown."
-                .format(topicAndPartition, id, currLeader))
-          })
+                .format(topicAndPartition, id, currLeaderAndIsr.leader))
+          }
         }
-      })
+      }
 
       /*
       * Force the shutting down broker out of the ISR of partitions that it
@@ -166,7 +165,7 @@ class KafkaController(val config : Kafka
       allPartitionsAndReplicationFactorOnBroker foreach {
         case(topicAndPartition, replicationFactor) =>
           val (topic, partition) = topicAndPartition.asTuple
-          if (controllerContext.allLeaders(topicAndPartition) != id) {
+          if (controllerContext.allLeaders(topicAndPartition).leader != id) {
             brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition,
deletePartition = false)
             removeReplicaFromIsr(topic, partition, id) match {
               case Some(updatedLeaderAndIsr) =>
@@ -222,34 +221,42 @@ class KafkaController(val config : Kafka
   /**
    * This callback is invoked by the replica state machine's broker change listener, with
the list of newly started
    * brokers as input. It does the following -
-   * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper
listeners to update
-   *    leader and ISR for every partition as they take place
-   * 2. Triggers the OnlinePartition state change for all new/offline partitions
-   * 3. Invokes the OnlineReplica state change on the input list of newly started brokers
+   * 1. Triggers the OnlinePartition state change for all new/offline partitions
+   * 2. It checks whether there are reassigned replicas assigned to any newly started brokers.
 If
+   *    so, it performs the reassignment logic for each topic/partition.
+   *
+   * Note that we don't need to refresh the leader/isr cache for all topic/partitions at
this point for two reasons:
+   * 1. The partition state machine, when triggering online state change, will refresh leader
and ISR for only those
+   *    partitions currently new or offline (rather than every partition this controller
is aware of)
+   * 2. Even if we do refresh the cache, there is no guarantee that by the time the leader
and ISR request reaches
+   *    every broker that it is still valid.  Brokers check the leader epoch to determine
validity of the request.
    */
   def onBrokerStartup(newBrokers: Seq[Int]) {
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
 
-    // update leader and isr cache for broker
-    updateLeaderAndIsrCache()
+    val newBrokersSet = newBrokers.toSet
     // update partition state machine
     partitionStateMachine.triggerOnlinePartitionStateChange()
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
newBrokers),
-      OnlineReplica)
+    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
newBrokers), OnlineReplica)
+
     // check if reassignment of some partitions need to be restarted
-    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p
=>
-      p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) ||
a))
+    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{
+      case (topicAndPartition, reassignmentContext) =>
+        reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+    }
     partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
   }
 
   /**
    * This callback is invoked by the replica state machine's broker change listener with
the list of failed brokers
    * as input. It does the following -
-   * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper
listeners to update
-   *    leader and ISR for every partition as they take place
-   * 2. Mark partitions with dead leaders offline
-   * 3. Triggers the OnlinePartition state change for all new/offline partitions
-   * 4. Invokes the OfflineReplica state change on the input list of newly started brokers
+   * 1. Mark partitions with dead leaders as offline
+   * 2. Triggers the OnlinePartition state change for all new/offline partitions
+   * 3. Invokes the OfflineReplica state change on the input list of newly started brokers
+   *
+   * Note that we don't need to refresh the leader/isr cache for all topic/partitions at
this point.  This is because
+   * the partition state machine will refresh our cache for us when performing leader election
for all new/offline
+   * partitions coming online.
    */
   def onBrokerFailure(deadBrokers: Seq[Int]) {
     info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
@@ -258,17 +265,15 @@ class KafkaController(val config : Kafka
       deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
     info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
 
-    // update leader and isr cache for broker
-    updateLeaderAndIsrCache()
+    val deadBrokersSet = deadBrokers.toSet
     // trigger OfflinePartition state for all partitions whose current leader is one amongst
the dead brokers
     val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader
=>
-      deadBrokers.contains(partitionAndLeader._2)).keySet
+      deadBrokersSet.contains(partitionAndLeader._2.leader)).keySet
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // handle dead replicas
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
deadBrokers),
-      OfflineReplica)
+    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
deadBrokers), OfflineReplica)
   }
 
   /**
@@ -391,9 +396,8 @@ class KafkaController(val config : Kafka
   private def initializeControllerContext() {
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
-    controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
-      controllerContext.allTopics.toSeq)
-    controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, Int]
+    controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
controllerContext.allTopics.toSeq)
+    controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderAndIsr]
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager
@@ -425,7 +429,7 @@ class KafkaController(val config : Kafka
     val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
     // check if they are already completed
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition
=>
-      controllerContext.allLeaders(partition) == controllerContext.partitionReplicaAssignment(partition).head)
+      controllerContext.allLeaders(partition).leader == controllerContext.partitionReplicaAssignment(partition).head)
     controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
     info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
@@ -445,7 +449,7 @@ class KafkaController(val config : Kafka
       // If the leader specified in the leaderAndIsr is no longer alive, there is no need
to recover it
       controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
         case true =>
-          controllerContext.allLeaders.put(topicPartition, leaderAndIsr.leader)
+          controllerContext.allLeaders.put(topicPartition, leaderAndIsr)
         case false =>
           debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader)
+
             "partition %s is dead, just ignore it".format(topicPartition))
@@ -465,14 +469,13 @@ class KafkaController(val config : Kafka
   private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
                                                       reassignedPartitionContext: ReassignedPartitionsContext)
{
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val currentLeader = controllerContext.allLeaders(topicAndPartition)
+    val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
     if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
       info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition)
+
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
       // move the leader to one of the alive and caught up new replicas
       partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
-    }
-    else {
+    } else {
       // check if the leader is alive or not
       controllerContext.liveBrokerIds.contains(currentLeader) match {
         case true =>
@@ -565,14 +568,13 @@ class KafkaController(val config : Kafka
   }
 
   def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition])
{
-    partitionsToBeRemoved.foreach { partition =>
+    for(partition <- partitionsToBeRemoved) {
       // check the status
-      val currentLeader = controllerContext.allLeaders(partition)
+      val currentLeader = controllerContext.allLeaders(partition).leader
       val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
       if(currentLeader == preferredReplica) {
         info("Partition %s completed preferred replica leader election. New leader is %d".format(partition,
preferredReplica))
-      }
-      else {
+      } else {
         warn("Partition %s failed to complete preferred replica leader election. Leader is
%d".format(partition, currentLeader))
       }
     }
@@ -608,23 +610,22 @@ class KafkaController(val config : Kafka
         case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
           if (leaderAndIsr.isr.contains(replicaId)) {
             val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch
+ 1,
-                                               leaderAndIsr.isr.filter(b => b != replicaId),
leaderAndIsr.zkVersion + 1)
+              leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
-            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
-              zkClient,
-              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
-              leaderAndIsr.zkVersion)
+            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
leaderAndIsr.zkVersion)
             newLeaderAndIsr.zkVersion = newVersion
 
             finalLeaderAndIsr = Some(newLeaderAndIsr)
-            if (updateSucceeded)
-              info("New leader and ISR for partition [%s, %d] is %s"
-                   .format(topic, partition, newLeaderAndIsr.toString()))
+            if (updateSucceeded) {
+              // we've successfully written to ZK, let's refresh our cache
+              info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition,
newLeaderAndIsr.toString()))
+              controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr)
+            }
             updateSucceeded
-          }
-          else {
+          } else {
             warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
-                 .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
+              .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
             finalLeaderAndIsr = Some(leaderAndIsr)
             true
           }
@@ -705,16 +706,14 @@ class PartitionsReassignedListener(contr
                 throw new KafkaException("Partition %s to be reassigned is already assigned
to replicas"
                   .format(topicAndPartition) +
                   " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
-              }
-              else {
+              } else {
                 if(aliveNewReplicas == newReplicas) {
                   info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
                     newReplicas.mkString(",")))
                   val context = createReassignmentContextForPartition(topic, partition, newReplicas)
                   controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
                   controller.onPartitionReassignment(topicAndPartition, context)
-                }
-                else {
+                } else {
                   // some replica in RAR is not alive. Fail partition reassignment
                   throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(","))
+
                     " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","),
topicAndPartition) +
@@ -724,7 +723,7 @@ class PartitionsReassignedListener(contr
             case None => throw new KafkaException("Attempt to reassign partition %s that
doesn't exist"
               .format(topicAndPartition))
           }
-        }catch {
+        } catch {
           case e => error("Error completing reassignment of partition %s".format(topicAndPartition),
e)
           // remove the partition from the admin path to unblock the admin client
           controller.removePartitionFromReassignedPartitions(topicAndPartition)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala?rev=1409618&r1=1409617&r2=1409618&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
Thu Nov 15 00:28:28 2012
@@ -128,7 +128,7 @@ with Logging {
     val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val preferredReplica = assignedReplicas.head
     // check if preferred replica is the current leader
-    val currentLeader = controllerContext.allLeaders(topicAndPartition)
+    val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
     if(currentLeader == preferredReplica) {
       throw new StateChangeFailedException("Preferred replica %d is already the current leader
for partition [%s,%d]"
         .format(preferredReplica, topic, partition))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1409618&r1=1409617&r2=1409618&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
Thu Nov 15 00:28:28 2012
@@ -17,13 +17,13 @@
 package kafka.controller
 
 import collection._
+import collection.JavaConversions._
+import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
+import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
-import collection.JavaConversions._
-import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
 
 /**
  * This class represents the state machine for partitions. It defines the states that a partition
can be in, and
@@ -44,7 +44,7 @@ class PartitionStateMachine(controller: 
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
-  private var isShuttingDown = new AtomicBoolean(false)
+  private val isShuttingDown = new AtomicBoolean(false)
 
   /**
    * Invoked on successful controller election. First registers a topic change listener since
that triggers all
@@ -81,13 +81,12 @@ class PartitionStateMachine(controller: 
     try {
       brokerRequestBatch.newBatch()
       // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition
state
-      partitionState.filter(partitionAndState =>
-        partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach
{
-        partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition,
OnlinePartition,
-                                               offlinePartitionSelector)
+      for((topicAndPartition, partitionState) <- partitionState) {
+        if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
+          handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition,
offlinePartitionSelector)
       }
       brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
-    }catch {
+    } catch {
       case e => error("Error while moving some partitions to the online state", e)
     }
   }
@@ -106,7 +105,7 @@ class PartitionStateMachine(controller: 
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState,
leaderSelector)
       }
       brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
-    }catch {
+    } catch {
       case e => error("Error while moving some partitions to %s state".format(targetState),
e)
     }
   }
@@ -145,7 +144,7 @@ class PartitionStateMachine(controller: 
             case _ => // should never come here since illegal previous states are checked
above
           }
           info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic,
partition,
-            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition)))
+            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leader))
           partitionState.put(topicAndPartition, OnlinePartition)
            // post: partition has a leader
         case OfflinePartition =>
@@ -162,7 +161,7 @@ class PartitionStateMachine(controller: 
           partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
-    }catch {
+    } catch {
       case t: Throwable => error("State change for partition [%s, %d] ".format(topic,
partition) +
         "from %s to %s failed".format(currState, targetState), t)
     }
@@ -241,9 +240,9 @@ class PartitionStateMachine(controller: 
           // GC pause
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
             topicAndPartition.partition, leaderAndIsr, replicaAssignment.size)
-          controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr.leader)
+          controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr)
           partitionState.put(topicAndPartition, OnlinePartition)
-        }catch {
+        } catch {
           case e: ZkNodeExistsException =>
             ControllerStat.offlinePartitionRate.mark()
             throw new StateChangeFailedException("Error while changing partition %s's state
from New to Online"
@@ -260,7 +259,7 @@ class PartitionStateMachine(controller: 
    * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)
    */
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector)
{
-    /** handle leader election for the partitions whose leader is no longer alive **/
+    // handle leader election for the partitions whose leader is no longer alive
     info("Electing leader for partition [%s, %d]".format(topic, partition))
     try {
       var zookeeperPathUpdateSucceeded: Boolean = false
@@ -278,13 +277,12 @@ class PartitionStateMachine(controller: 
         replicasForThisPartition = replicas
       }
       // update the leader cache
-      controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr.leader)
+      controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr)
       info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader,
topic, partition))
-      // store new leader and isr info in cache
-      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition,
-                                                          topic, partition, newLeaderAndIsr,
+      // notify all replicas of the new leader
+      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic,
partition, newLeaderAndIsr,
         controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
-    }catch {
+    } catch {
       case poe: PartitionOfflineException => throw new PartitionOfflineException("All
replicas for partition [%s, %d] are dead."
         .format(topic, partition) + " Marking this partition offline", poe)
       case sce => throw new StateChangeFailedException(("Error while electing leader for
partition " +

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1409618&r1=1409617&r2=1409618&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
Thu Nov 15 00:28:28 2012
@@ -17,11 +17,11 @@
 package kafka.controller
 
 import collection._
-import kafka.utils.{ZkUtils, Logging}
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
-import org.I0Itec.zkclient.IZkChildListener
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
+import kafka.utils.{ZkUtils, Logging}
+import org.I0Itec.zkclient.IZkChildListener
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica
can be in, and
@@ -113,7 +113,7 @@ class ReplicaStateMachine(controller: Ka
                 throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot
be moved to NewReplica"
                   .format(replicaId, topic, partition) + "state as it is being requested
to become leader")
               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                  topic, partition, leaderAndIsr,
replicaAssignment.size)
+                topic, partition, leaderAndIsr, replicaAssignment.size)
             case None => // new leader request will be sent to this replica when one gets
elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
@@ -137,16 +137,13 @@ class ReplicaStateMachine(controller: Ka
               controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas
:+ replicaId)
               info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId,
topic, partition))
             case _ =>
-              // check if the leader for this partition is alive or even exists
-              // NOTE: technically, we could get the leader from the allLeaders cache, but
we need to read zookeeper
-              // for the ISR anyways
-              val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic,
partition)
-              leaderAndIsrOpt match {
+              // if the leader for this replica exists and is alive, send the leader and
ISR
+              controllerContext.allLeaders.get(topicAndPartition) match {
                 case Some(leaderAndIsr) =>
                   controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
                     case true => // leader is alive
                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                          topic, partition,
leaderAndIsr, replicaAssignment.size)
+                        topic, partition, leaderAndIsr, replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OnlineReplica)
                       info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId,
topic, partition))
                     case false => // ignore partitions whose leader is not alive
@@ -158,21 +155,17 @@ class ReplicaStateMachine(controller: Ka
         case OfflineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica),
targetState)
           // As an optimization, the controller removes dead replicas from the ISR
-          val currLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic,
partition)
-          val leaderAndIsrIsEmpty: Boolean = currLeaderAndIsrOpt match {
+          val leaderAndIsrIsEmpty = controllerContext.allLeaders.get(topicAndPartition) match
{
             case Some(currLeaderAndIsr) =>
               if (currLeaderAndIsr.isr.contains(replicaId))
                 controller.removeReplicaFromIsr(topic, partition, replicaId) match {
                   case Some(updatedLeaderAndIsr) =>
                     // send the shrunk ISR state change request only to the leader
                     brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader),
-                                                                        topic, partition,
updatedLeaderAndIsr,
-                                                                        replicaAssignment.size)
+                      topic, partition, updatedLeaderAndIsr, replicaAssignment.size)
                     replicaState.put((topic, partition, replicaId), OfflineReplica)
-                    info("Replica %d for partition [%s, %d] state changed to OfflineReplica"
-                                 .format(replicaId, topic, partition))
-                    info("Removed offline replica %d from ISR for partition [%s, %d]"
-                                 .format(replicaId, topic, partition))
+                    info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId,
topic, partition))
+                    info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId,
topic, partition))
                     false
                   case None =>
                     true



Mime
View raw message