kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1403598 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/controller/ main/scala/kafka/message/ main/scala/kafka/server/ test/scala/unit/kafka/api/
Date Tue, 30 Oct 2012 01:28:02 GMT
Author: junrao
Date: Tue Oct 30 01:28:01 2012
New Revision: 1403598

URL: http://svn.apache.org/viewvc?rev=1403598&view=rev
Log:
Partition.makeFollower() reads broker info from ZK; patched by Swapnil Ghike; reviewed by
Jun Rao; KAFKA-575

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Tue
Oct 30 01:28:01 2012
@@ -23,6 +23,7 @@ import kafka.utils._
 import kafka.api.ApiUtils._
 import collection.mutable.Map
 import collection.mutable.HashMap
+import kafka.cluster.Broker
 
 
 object LeaderAndIsr {
@@ -92,7 +93,13 @@ object LeaderAndIsrRequest {
 
       partitionStateInfos.put((topic, partition), partitionStateInfo)
     }
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos)
+
+    val leadersCount = buffer.getInt
+    var leaders = Set[Broker]()
+    for (i <- 0 until leadersCount)
+      leaders += Broker.readFrom(buffer)
+
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, leaders)
   }
 }
 
@@ -100,11 +107,12 @@ object LeaderAndIsrRequest {
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
                                 ackTimeoutMs: Int,
-                                partitionStateInfos: Map[(String, Int), PartitionStateInfo])
+                                partitionStateInfos: Map[(String, Int), PartitionStateInfo],
+                                leaders: Set[Broker])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionStateInfos)
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker])
= {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionStateInfos, liveBrokers)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -117,12 +125,17 @@ case class LeaderAndIsrRequest (versionI
       buffer.putInt(key._2)
       value.writeTo(buffer)
     }
+    buffer.putInt(leaders.size)
+    leaders.foreach(_.writeTo(buffer))
   }
 
   def sizeInBytes(): Int = {
     var size = 1 + 2 + (2 + clientId.length) + 4 + 4
     for((key, value) <- partitionStateInfos)
       size += (2 + key._1.length) + 4 + value.sizeInBytes
+    size += 4
+    for(broker <- leaders)
+      size += broker.sizeInBytes
     size
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Tue Oct
30 01:28:01 2012
@@ -112,73 +112,72 @@ class Partition(val topic: String,
 
 
   /**
-   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make
it the new leader of follower to the new leader.
+   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make
the local replica the leader in the following steps.
+   *  1. stop the existing replica fetcher
+   *  2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in
ISR to be available)
+   *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the
time when this broker was the leader last time)
+   *  4. set the new leader and ISR
    */
-  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, isMakingLeader:
Boolean): Boolean = {
+  def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr): Boolean =
{
     leaderIsrUpdateLock synchronized {
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch
[%d], discard the become %s request"
-          .format(leaderEpoch, leaderAndIsr.leaderEpoch, if(isMakingLeader) "leader" else
"follower"))
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch
[%d], discard the become leader request"
+          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      if(isMakingLeader)
-        makeLeader(topic, partitionId, leaderAndIsr)
-      else
-        makeFollower(topic, partitionId, leaderAndIsr)
+      trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
+      // stop replica fetcher thread, if any
+      replicaFetcherManager.removeFetcher(topic, partitionId)
+
+      val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
+      // reset LogEndOffset for remote replicas
+      assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset =
ReplicaManager.UnknownLogEndOffset)
+      inSyncReplicas = newInSyncReplicas
+      leaderEpoch = leaderAndIsr.leaderEpoch
+      zkVersion = leaderAndIsr.zkVersion
+      leaderReplicaIdOpt = Some(localBrokerId)
+      // we may need to increment high watermark since ISR could be down to 1
+      maybeIncrementLeaderHW(getReplica().get)
       true
     }
   }
 
   /**
-   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make
the local replica the leader in the following steps.
-   *  1. stop the existing replica fetcher
-   *  2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in
ISR to be available)
-   *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the
time when this broker was the leader last time)
-   *  4. set the new leader and ISR
-   */
-  private def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) {
-    trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
-    // stop replica fetcher thread, if any
-    replicaFetcherManager.removeFetcher(topic, partitionId)
-
-    val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
-    // reset LogEndOffset for remote replicas
-    assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
-    inSyncReplicas = newInSyncReplicas
-    leaderEpoch = leaderAndIsr.leaderEpoch
-    zkVersion = leaderAndIsr.zkVersion
-    leaderReplicaIdOpt = Some(localBrokerId)
-    // we may need to increment high watermark since ISR could be down to 1
-    maybeIncrementLeaderHW(getReplica().get)
-  }
-
-  /**
+   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make
the local replica the follower in the following steps.
    *  1. stop any existing fetcher on this partition from the local replica
    *  2. make sure local replica exists and truncate the log to high watermark
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  private def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) =
{
-    trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
-    val newLeaderBrokerId: Int = leaderAndIsr.leader
-    info("Starting the follower state transition to follow leader %d for topic %s partition
%d"
-      .format(newLeaderBrokerId, topic, partitionId))
-    ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match {
-      case Some(leaderBroker) =>
-        // stop fetcher thread to previous leader
-        replicaFetcherManager.removeFetcher(topic, partitionId)
-        // make sure local replica exists
-        val localReplica = getOrCreateReplica()
-        localReplica.log.get.truncateTo(localReplica.highWatermark)
-        inSyncReplicas = Set.empty[Replica]
-        leaderEpoch = leaderAndIsr.leaderEpoch
-        zkVersion = leaderAndIsr.zkVersion
-        leaderReplicaIdOpt = Some(newLeaderBrokerId)
-        // start fetcher thread to current leader
-        replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
-      case None => // leader went down
-        warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId,
newLeaderBrokerId) +
-        " topic %s partition %d became unavailble during the state change operation".format(topic,
partitionId))
+  def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers:
Set[Broker]): Boolean = {
+    leaderIsrUpdateLock synchronized {
+      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch
[%d], discard the become follwer request"
+          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+        return false
+      }
+      trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
+      val newLeaderBrokerId: Int = leaderAndIsr.leader
+      info("Starting the follower state transition to follow leader %d for topic %s partition
%d"
+        .format(newLeaderBrokerId, topic, partitionId))
+      liveBrokers.find(_.id == newLeaderBrokerId) match {
+        case Some(leaderBroker) =>
+          // stop fetcher thread to previous leader
+          replicaFetcherManager.removeFetcher(topic, partitionId)
+          // make sure local replica exists
+          val localReplica = getOrCreateReplica()
+          localReplica.log.get.truncateTo(localReplica.highWatermark)
+          inSyncReplicas = Set.empty[Replica]
+          leaderEpoch = leaderAndIsr.leaderEpoch
+          zkVersion = leaderAndIsr.zkVersion
+          leaderReplicaIdOpt = Some(newLeaderBrokerId)
+          // start fetcher thread to current leader
+          replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset,
leaderBroker)
+        case None => // leader went down
+          warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId,
newLeaderBrokerId) +
+          " topic %s partition %d became unavailble during the state change operation".format(topic,
partitionId))
+      }
+      true
     }
   }
 
@@ -325,4 +324,4 @@ class Partition(val topic: String,
     partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
     partitionString.toString()
   }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Tue Oct 30 01:28:01 2012
@@ -183,11 +183,13 @@ class ControllerBrokerRequestBatch(sendR
     }
   }
 
-  def sendRequestsToBrokers() {
+  def sendRequestsToBrokers(liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos)
+      val leaderIds = partitionStateInfos.map(_._2.leaderAndIsr.leader).toSet
+      val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders)
       debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
Tue Oct 30 01:28:01 2012
@@ -177,7 +177,7 @@ class KafkaController(val config : Kafka
             }
           }
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
 
       val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
       debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
Tue Oct 30 01:28:01 2012
@@ -86,7 +86,7 @@ class PartitionStateMachine(controller: 
         partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition,
OnlinePartition,
                                                offlinePartitionSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to the online state", e)
     }
@@ -105,7 +105,7 @@ class PartitionStateMachine(controller: 
       partitions.foreach { topicAndPartition =>
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState,
leaderSelector)
       }
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState),
e)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
Tue Oct 30 01:28:01 2012
@@ -83,7 +83,7 @@ class ReplicaStateMachine(controller: Ka
     try {
       brokerRequestBatch.newBatch()
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers()
+      brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some replicas to %s state".format(targetState),
e)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Tue Oct 30 01:28:01 2012
@@ -24,7 +24,6 @@ import java.nio.channels._
 import java.io.{InputStream, ByteArrayOutputStream, DataOutputStream}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.IteratorTemplate
-import kafka.common.InvalidMessageSizeException
 
 object ByteBufferMessageSet {
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue
Oct 30 01:28:01 2012
@@ -16,7 +16,7 @@
  */
 package kafka.server
 
-import kafka.cluster.{Partition, Replica}
+import kafka.cluster.{Broker, Partition, Replica}
 import collection._
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
@@ -173,7 +173,7 @@ class ReplicaManager(val config: KafkaCo
         if(requestedLeaderId == config.brokerId)
           makeLeader(topic, partitionId, partitionStateInfo)
         else
-          makeFollower(topic, partitionId, partitionStateInfo)
+          makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
       } catch {
         case e =>
           error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
@@ -201,7 +201,7 @@ class ReplicaManager(val config: KafkaCo
     val leaderAndIsr = partitionStateInfo.leaderAndIsr
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, true)) {
+    if (partition.makeLeader(topic, partitionId, leaderAndIsr)) {
       // also add this partition to the list of partitions for which the leader is the current
broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -210,14 +210,14 @@ class ReplicaManager(val config: KafkaCo
     info("Completed the leader state transition for topic %s partition %d".format(topic,
partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo)
{
+  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
liveBrokers: Set[Broker]) {
     val leaderAndIsr = partitionStateInfo.leaderAndIsr
     val leaderBrokerId: Int = leaderAndIsr.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition
%d"
                  .format(leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, false)) {
+    if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1403598&r1=1403597&r2=1403598&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
Tue Oct 30 01:28:01 2012
@@ -87,7 +87,7 @@ object SerializationTestUtils{
     val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map)
+    new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]())
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {



Mime
View raw message