kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1375670 [1/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/sr...
Date Tue, 21 Aug 2012 17:17:30 GMT
Author: junrao
Date: Tue Aug 21 17:17:29 2012
New Revision: 1375670

URL: http://svn.apache.org/viewvc?rev=1375670&view=rev
Log:
refactor ReplicaManager; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-351

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/InvalidPartitionException.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Tue Aug 21 17:17:29 2012
@@ -270,7 +270,7 @@ public class KafkaETLContext {
         } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
             throw new IOException(_input + " current offset=" + _offset
                     + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.InvalidPartitionCode()) {
+        } else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
             throw new IOException(_input + " : wrong partition");
         } else if (errorCode != ErrorMapping.NoError()) {
             throw new IOException(_input + " current offset=" + _offset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Aug 21 17:17:29 2012
@@ -135,7 +135,7 @@ object AdminUtils extends Logging {
         new TopicMetadata(topic, partitionMetadata)
       } else {
         // topic doesn't exist, send appropriate error code
-        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicCode)
+        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
       }
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala Tue Aug 21 17:17:29 2012
@@ -79,7 +79,7 @@ object ListTopicCommand {
   def showTopic(topic: String, zkClient: ZkClient) {
     val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     topicMetaData.errorCode match {
-      case ErrorMapping.UnknownTopicCode =>
+      case ErrorMapping.UnknownTopicOrPartitionCode =>
         println("topic " + topic + " doesn't exist!")
       case _ =>
         println("topic: " + topic)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala Tue Aug 21 17:17:29 2012
@@ -21,7 +21,7 @@ import kafka.common.ErrorMapping
 import java.nio.ByteBuffer
 import kafka.utils.Utils
 import collection.mutable.HashMap
-import collection.mutable.Map
+import collection.Map
 
 
 object LeaderAndISRResponse {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Tue Aug 21 17:17:29 2012
@@ -16,89 +16,227 @@
  */
 package kafka.cluster
 
-import java.util.concurrent.locks.ReentrantLock
 import scala.collection._
-import kafka.utils.{ZkUtils, SystemTime, Time}
-import kafka.common.{KafkaException, LeaderNotAvailableException}
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.Logging
+import kafka.utils._
+import java.lang.Object
+import kafka.api.LeaderAndISR
+import kafka.server.ReplicaManager
+import kafka.common.ErrorMapping
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
  */
 class Partition(val topic: String,
                 val partitionId: Int,
-                time: Time = SystemTime,
-                var inSyncReplicas: Set[Replica] = Set.empty[Replica]) extends Logging {
-  private var leaderReplicaId: Option[Int] = None
-  private var assignedReplicas: Set[Replica] = Set.empty[Replica]
-  private var highWatermarkUpdateTime: Long = -1L
-  private val leaderISRUpdateLock = new ReentrantLock()
-
-  def leaderId(newLeader: Option[Int] = None): Option[Int] = {
-    try {
-      leaderISRUpdateLock.lock()
-      if(newLeader.isDefined) {
-        info("Updating leader for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
-        leaderReplicaId = newLeader
-      }
-      leaderReplicaId
-    }finally {
-      leaderISRUpdateLock.unlock()
+                time: Time,
+                val replicaManager: ReplicaManager) extends Logging {
+  private val localBrokerId = replicaManager.config.brokerId
+  private val logManager = replicaManager.logManager
+  private val replicaFetcherManager = replicaManager.replicaFetcherManager
+  private val highwaterMarkCheckpoint = replicaManager.highWatermarkCheckpoint
+  private val zkClient = replicaManager.zkClient
+  var leaderReplicaIdOpt: Option[Int] = None
+  var inSyncReplicas: Set[Replica] = Set.empty[Replica]
+  private val assignedReplicaMap = new Pool[Int,Replica]
+  private val leaderISRUpdateLock = new Object
+
+  private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
+
+  def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
+    val replicaOpt = getReplica(replicaId)
+    replicaOpt match {
+      case Some(replica) => replica
+      case None =>
+        if (isReplicaLocal(replicaId)) {
+          val log = logManager.getOrCreateLog(topic, partitionId)
+          val localReplica = new Replica(replicaId, this, time,
+                                         highwaterMarkCheckpoint.read(topic, partitionId), Some(log))
+          addReplicaIfNotExists(localReplica)
+        }
+        else {
+          val remoteReplica = new Replica(replicaId, this, time)
+          addReplicaIfNotExists(remoteReplica)
+        }
+        getReplica(replicaId).get
     }
   }
 
-  def assignedReplicas(replicas: Option[Set[Replica]] = None): Set[Replica] = {
-    replicas match {
-      case Some(ar) =>
-        assignedReplicas = ar
-      case None =>
+  def getReplica(replicaId: Int = localBrokerId): Option[Replica] = {
+    val replica = assignedReplicaMap.get(replicaId)
+    if (replica == null)
+      None
+    else
+      Some(replica)
+  }
+
+  def leaderReplicaIfLocal(): Option[Replica] = {
+    leaderISRUpdateLock synchronized {
+      leaderReplicaIdOpt match {
+        case Some(leaderReplicaId) =>
+          if (leaderReplicaId == localBrokerId)
+            getReplica(localBrokerId)
+          else
+            None
+        case None => None
+      }
     }
-    assignedReplicas
   }
 
-  def getReplica(replicaId: Int): Option[Replica] = {
-    assignedReplicas().find(_.brokerId == replicaId)
+  def addReplicaIfNotExists(replica: Replica) = {
+    assignedReplicaMap.putIfNotExists(replica.brokerId, replica)
   }
 
-  def addReplica(replica: Replica): Boolean = {
-    if(!assignedReplicas.contains(replica)) {
-      assignedReplicas += replica
-      true
-    }else false
+  def assignedReplicas(): Set[Replica] = {
+    assignedReplicaMap.values.toSet
   }
 
-  def updateReplicaLeo(replica: Replica, leo: Long) {
-    replica.logEndOffset(Some(leo))
-    debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
+  /**
+   *  If the local replica is not the leader, make the local replica the leader in the following steps.
+   *  1. stop the existing replica fetcher
+   *  2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available)
+   *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
+   *  4. set the new leader and ISR
+   */
+  def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = {
+    leaderISRUpdateLock synchronized {
+      val shouldBecomeLeader = leaderReplicaIdOpt match {
+        case Some(leaderReplicaId) => !isReplicaLocal(leaderReplicaId)
+        case None => true
+      }
+      if (shouldBecomeLeader) {
+        info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
+        // stop replica fetcher thread, if any
+        replicaFetcherManager.removeFetcher(topic, partitionId)
+
+        val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
+        // reset LogEndOffset for remote replicas
+        assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
+        inSyncReplicas = newInSyncReplicas
+        leaderReplicaIdOpt = Some(localBrokerId)
+        true
+      } else
+        false
+    }
   }
 
-  def leaderReplica(): Replica = {
-    val leaderReplicaId = leaderId()
-    if(leaderReplicaId.isDefined) {
-      val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get)
-      if(leaderReplica.isDefined) leaderReplica.get
-      else throw new KafkaException("No replica for leader %d in the replica manager"
-        .format(leaderReplicaId.get))
-    }else
-      throw new LeaderNotAvailableException("Leader for topic %s partition %d does not exist"
-        .format(topic, partitionId))
+  /**
+   *  If the local replica is not already following the new leader, make it follow the new leader.
+   *  1. stop any existing fetcher on this partition from the local replica
+   *  2. make sure local replica exists and truncate the log to high watermark
+   *  3. set the leader and set ISR to empty
+   *  4. start a fetcher to the new leader
+   */
+  def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = {
+    leaderISRUpdateLock synchronized  {
+      val newLeaderBrokerId: Int = leaderAndISR.leader
+      info("Starting the follower state transition to follow leader %d for topic %s partition %d"
+                   .format(newLeaderBrokerId, topic, partitionId))
+      val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head
+      val currentLeaderBrokerIdOpt = replicaFetcherManager.fetcherSourceBroker(topic, partitionId)
+      // become follower only if it is not already following the same leader
+      val shouldBecomeFollower = currentLeaderBrokerIdOpt match {
+        case Some(currentLeaderBrokerId) => currentLeaderBrokerId != newLeaderBrokerId
+        case None => true
+      }
+      if(shouldBecomeFollower) {
+        info("Becoming follower to leader %d for topic %s partition %d".format(newLeaderBrokerId, topic, partitionId))
+        // stop fetcher thread to previous leader
+        replicaFetcherManager.removeFetcher(topic, partitionId)
+
+        // make sure local replica exists
+        val localReplica = getOrCreateReplica()
+        localReplica.log.get.truncateTo(localReplica.highWatermark)
+        inSyncReplicas = Set.empty[Replica]
+        leaderReplicaIdOpt = Some(newLeaderBrokerId)
+
+        // start fetcher thread to current leader
+        replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+        true
+      } else
+        false
+    }
   }
 
-  def leaderHW(newHw: Option[Long] = None): Long = {
-    newHw match {
-      case Some(highWatermark) =>
-        leaderReplica().highWatermark(newHw)
-        highWatermarkUpdateTime = time.milliseconds
-        highWatermark
-      case None =>
-        leaderReplica().highWatermark()
+  def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
+    leaderISRUpdateLock synchronized {
+      debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partitionId))
+      val replica = getOrCreateReplica(replicaId)
+      replica.logEndOffset = offset
+
+      // check if this replica needs to be added to the ISR
+      leaderReplicaIfLocal() match {
+        case Some(leaderReplica) =>
+          val replica = getReplica(replicaId).get
+          val leaderHW = leaderReplica.highWatermark
+          if (replica.logEndOffset >= leaderHW) {
+            // expand ISR
+            val newInSyncReplicas = inSyncReplicas + replica
+            info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
+            // update ISR in ZK and cache
+            updateISR(newInSyncReplicas)
+          }
+          maybeIncrementLeaderHW(leaderReplica)
+        case None => // nothing to do if no longer leader
+      }
     }
   }
 
-  def hwUpdateTime: Long = highWatermarkUpdateTime
+  def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
+    leaderISRUpdateLock synchronized {
+      leaderReplicaIfLocal() match {
+        case Some(_) =>
+          val numAcks = inSyncReplicas.count(r => {
+            if (!r.isLocal)
+              r.logEndOffset >= requiredOffset
+            else
+              true /* also count the local (leader) replica */
+          })
+          trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
+          if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
+              (requiredAcks > 0 && numAcks >= requiredAcks)) {
+            /*
+            * requiredAcks < 0 means acknowledge after all replicas in ISR
+            * are fully caught up to the (local) leader's offset
+            * corresponding to this produce request.
+            */
+            (true, ErrorMapping.NoError)
+          } else
+            (false, ErrorMapping.NoError)
+        case None =>
+          (false, ErrorMapping.NotLeaderForPartitionCode)
+      }
+    }
+  }
+  
+  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
+    val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
+    val newHighWatermark = allLogEndOffsets.min
+    val oldHighWatermark = leaderReplica.highWatermark
+    if(newHighWatermark > oldHighWatermark)
+      leaderReplica.highWatermark = newHighWatermark
+    else
+      debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s"
+            .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
+  }
+
+  def maybeShrinkISR(replicaMaxLagTimeMs: Long,  replicaMaxLagBytes: Long) {
+    leaderISRUpdateLock synchronized {
+      leaderReplicaIfLocal() match {
+        case Some(leaderReplica) =>
+          val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes)
+          if(outOfSyncReplicas.size > 0) {
+            val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
+            assert(newInSyncReplicas.size > 0)
+            info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
+            // update ISR in zk and in cache
+            updateISR(newInSyncReplicas)
+          }
+        case None => // do nothing if no longer leader
+      }
+    }
+  }
 
-  def getOutOfSyncReplicas(keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
+  def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
     /**
      * there are two cases that need to be handled here -
      * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated
@@ -106,43 +244,30 @@ class Partition(val topic: String,
      * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
      *                     follower is not catching up and should be removed from the ISR
     **/
+    val leaderLogEndOffset = leaderReplica.logEndOffset
+    val candidateReplicas = inSyncReplicas - leaderReplica
     // Case 1 above
-    val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset())
-    info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
+    val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < leaderLogEndOffset)
+    debug("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
       possiblyStuckReplicas.map(_.brokerId).mkString(",")))
-    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime < (time.milliseconds - keepInSyncTimeMs))
-    info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
-    val leader = leaderReplica()
+    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs))
+    debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     // Case 2 above
-    val slowReplicas = inSyncReplicas.filter(r => (leader.logEndOffset() - r.logEndOffset()) > keepInSyncBytes)
-    info("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
+    val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncBytes)
+    debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
     stuckReplicas ++ slowReplicas
   }
 
-  def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) {
-    try{
-      leaderISRUpdateLock.lock()
-      // update partition's ISR in cache
-      inSyncReplicas = newISR.map {r =>
-        getReplica(r) match {
-          case Some(replica) => replica
-          case None => throw new KafkaException("ISR update failed. No replica for id %d".format(r))
-        }
-      }
-      info("Updated ISR for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
-      if(zkClientOpt.isDefined){
-        val zkClient = zkClientOpt.get
-        val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId)
-        curLeaderAndISR match {
-          case None =>
-            throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId))
-          case Some(m) =>
-            m.ISR = newISR.toList
-            ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString)
-        }
-      }
-    } finally {
-      leaderISRUpdateLock.unlock()
+  private def updateISR(newISR: Set[Replica]) {
+    info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
+    inSyncReplicas = newISR
+    val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId)
+    curLeaderAndISR match {
+      case None =>
+        throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId))
+      case Some(m) =>
+         m.ISR = newISR.map(r => r.brokerId).toList
+         ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString)
     }
   }
 
@@ -163,8 +288,8 @@ class Partition(val topic: String,
     val partitionString = new StringBuilder
     partitionString.append("Topic: " + topic)
     partitionString.append("; Partition: " + partitionId)
-    partitionString.append("; Leader: " + leaderId())
-    partitionString.append("; Assigned replicas: " + assignedReplicas().map(_.brokerId).mkString(","))
+    partitionString.append("; Leader: " + leaderReplicaIdOpt)
+    partitionString.append("; Assigned replicas: " + assignedReplicaMap.keys.mkString(","))
     partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
     partitionString.toString()
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Tue Aug 21 17:17:29 2012
@@ -20,36 +20,41 @@ package kafka.cluster
 import kafka.log.Log
 import kafka.utils.{SystemTime, Time, Logging}
 import kafka.common.KafkaException
+import kafka.server.ReplicaManager
+import java.util.concurrent.atomic.AtomicLong
 
 class Replica(val brokerId: Int,
               val partition: Partition,
-              val topic: String,
               time: Time = SystemTime,
-              var hw: Option[Long] = None,
-              var log: Option[Log] = None) extends Logging {
-  private var logEndOffset: Long = -1L
-  private var logEndOffsetUpdateTimeMs: Long = -1L
-
-  def logEndOffset(newLeo: Option[Long] = None): Long = {
-    isLocal match {
-      case true =>
-        newLeo match {
-          case Some(newOffset) => logEndOffsetUpdateTimeMs = time.milliseconds; newOffset
-          case None => log.get.logEndOffset
-        }
-      case false =>
-        newLeo match {
-          case Some(newOffset) =>
-            logEndOffset = newOffset
-            logEndOffsetUpdateTimeMs = time.milliseconds
-            trace("Setting log end offset for replica %d for topic %s partition %d to %d"
-              .format(brokerId, topic, partition.partitionId, logEndOffset))
-            logEndOffset
-          case None => logEndOffset
-        }
-    }
+              initialHighWatermarkValue: Long = 0L,
+              val log: Option[Log] = None) extends Logging {
+  //only defined in local replica
+  private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue)
+  // only used for remote replica; logEndOffsetValue for local replica is kept in log
+  private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset)
+  private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds)
+  val topic = partition.topic
+  val partitionId = partition.partitionId
+
+  def logEndOffset_=(newLogEndOffset: Long) = {
+    if (!isLocal) {
+      logEndOffsetValue.set(newLogEndOffset)
+      logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
+      trace("Setting log end offset for replica %d for topic %s partition %d to %d"
+            .format(brokerId, topic, partitionId, logEndOffsetValue))
+    } else
+      throw new KafkaException("Shouldn't set logEndOffset for replica %d topic %s partition %d since it's local"
+          .format(brokerId, topic, partitionId))
+
   }
 
+  def logEndOffset = {
+    if (isLocal)
+      log.get.logEndOffset
+    else
+      logEndOffsetValue.get()
+  }
+  
   def isLocal: Boolean = {
     log match {
       case Some(l) => true
@@ -57,34 +62,24 @@ class Replica(val brokerId: Int,
     }
   }
 
-  def logEndOffsetUpdateTime = logEndOffsetUpdateTimeMs
+  def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
 
-  def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = {
-    highwaterMarkOpt match {
-      case Some(highwaterMark) =>
-        isLocal match {
-          case true =>
-            trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
-                                                                                   brokerId, highwaterMark))
-            hw = Some(highwaterMark)
-            highwaterMark
-          case false => throw new KafkaException("Unable to set highwatermark for topic %s ".format(topic) +
-            "partition %d on broker %d, since there is no local log for this partition"
-              .format(partition.partitionId, brokerId))
-        }
-      case None =>
-        isLocal match {
-          case true =>
-            hw match {
-              case Some(highWatermarkValue) => highWatermarkValue
-              case None => throw new KafkaException("HighWatermark does not exist for topic %s ".format(topic) +
-              " partition %d on broker %d but local log exists".format(partition.partitionId, brokerId))
-            }
-          case false => throw new KafkaException("Unable to get highwatermark for topic %s ".format(topic) +
-            "partition %d on broker %d, since there is no local log for this partition"
-              .format(partition.partitionId, brokerId))
-        }
-    }
+  def highWatermark_=(newHighWatermark: Long) {
+    if (isLocal) {
+      trace("Setting hw for replica %d topic %s partition %d on broker %d to %d"
+              .format(brokerId, topic, partitionId, newHighWatermark))
+      highWatermarkValue.set(newHighWatermark)
+    } else
+      throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition %d since it's not local"
+              .format(brokerId, topic, partitionId))
+  }
+
+  def highWatermark = {
+    if (isLocal)
+      highWatermarkValue.get()
+    else
+      throw new KafkaException("Unable to get highwatermark for replica %d topic %s partition %d since it's not local"
+              .format(brokerId, topic, partitionId))
   }
 
   override def equals(that: Any): Boolean = {
@@ -107,7 +102,7 @@ class Replica(val brokerId: Int,
     replicaString.append("; Topic: " + topic)
     replicaString.append("; Partition: " + partition.toString)
     replicaString.append("; isLocal: " + isLocal)
-    if(isLocal) replicaString.append("; Highwatermark: " + highWatermark())
+    if(isLocal) replicaString.append("; Highwatermark: " + highWatermark)
     replicaString.toString()
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Tue Aug 21 17:17:29 2012
@@ -32,27 +32,25 @@ object ErrorMapping {
   val NoError : Short = 0
   val OffsetOutOfRangeCode : Short = 1
   val InvalidMessageCode : Short = 2
-  val InvalidPartitionCode : Short = 3
+  val UnknownTopicOrPartitionCode : Short = 3
   val InvalidFetchSizeCode  : Short = 4
   val InvalidFetchRequestFormatCode : Short = 5
   val LeaderNotAvailableCode : Short = 6
   val NotLeaderForPartitionCode : Short = 7
-  val UnknownTopicCode : Short = 8
-  val RequestTimedOutCode: Short = 9
-  val BrokerNotExistInZookeeperCode: Short = 10
-  val ReplicaNotAvailableCode: Short = 11
+  val RequestTimedOutCode: Short = 8
+  val BrokerNotExistInZookeeperCode: Short = 9
+  val ReplicaNotAvailableCode: Short = 10
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
       classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
-      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode,
+      classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
-      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode,
       classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
     ).withDefaultValue(UnknownCode)

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala?rev=1375670&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala Tue Aug 21 17:17:29 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates an unknown topic or a partition id not between 0 and numPartitions-1
+ */
+class UnknownTopicOrPartitionException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Aug 21 17:17:29 2012
@@ -23,7 +23,7 @@ import scala.collection._
 import kafka.server.KafkaConfig
 import kafka.api.OffsetRequest
 import kafka.log.Log._
-import kafka.common.{KafkaException, InvalidTopicException, InvalidPartitionException}
+import kafka.common.{KafkaException, InvalidTopicException, UnknownTopicOrPartitionException}
 
 /**
  * The guy who creates and hands out logs
@@ -96,7 +96,7 @@ private[kafka] class LogManager(val conf
       val error = "Wrong partition %d, valid partitions (0, %d)."
               .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
       warn(error)
-      throw new InvalidPartitionException(error)
+      throw new UnknownTopicOrPartitionException(error)
     }
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Tue Aug 21 17:17:29 2012
@@ -21,7 +21,6 @@ import kafka.api.{TopicMetadataRequest, 
 import kafka.common.KafkaException
 import kafka.utils.{Logging, Utils}
 import kafka.common.ErrorMapping
-import kafka.cluster.{Replica, Partition}
 
 
 class BrokerPartitionInfo(producerConfig: ProducerConfig,
@@ -37,7 +36,7 @@ class BrokerPartitionInfo(producerConfig
    * @return a sequence of (brokerId, numPartitions). Returns a zero-length
    * sequence if no brokers are available.
    */
-  def getBrokerPartitionInfo(topic: String): Seq[Partition] = {
+  def getBrokerPartitionInfo(topic: String): Seq[PartitionAndLeader] = {
     debug("Getting broker partition info for topic %s".format(topic))
     // check if the cache has metadata for this topic
     val topicMetadata = topicPartitionInfo.get(topic)
@@ -55,16 +54,13 @@ class BrokerPartitionInfo(producerConfig
       }
     val partitionMetadata = metadata.partitionsMetadata
     partitionMetadata.map { m =>
-      val partition = new Partition(topic, m.partitionId)
       m.leader match {
         case Some(leader) =>
-          val leaderReplica = new Replica(leader.id, partition, topic)
-          partition.leaderId(Some(leaderReplica.brokerId))
           debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
-          partition
+          new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
         case None =>
           debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
-          partition
+          new PartitionAndLeader(topic, m.partitionId, None)
       }
     }.sortWith((s, t) => s.partitionId < t.partitionId)
   }
@@ -113,3 +109,5 @@ class BrokerPartitionInfo(producerConfig
     }
   }
 }
+
+case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Tue Aug 21 17:17:29 2012
@@ -17,7 +17,6 @@
 
 package kafka.producer.async
 
-import kafka.cluster.Partition
 import kafka.common._
 import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
 import kafka.producer._
@@ -105,7 +104,7 @@ class DefaultEventHandler[K,V](config: P
         val brokerPartition = topicPartitionsList(partitionIndex)
 
         // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-        val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
+        val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
 
         var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
         ret.get(leaderBrokerId) match {
@@ -135,7 +134,7 @@ class DefaultEventHandler[K,V](config: P
     }
   }
 
-  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
+  private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[PartitionAndLeader] = {
     debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
     val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
     debug("Broker partitions registered for topic: %s are %s"
@@ -146,7 +145,7 @@ class DefaultEventHandler[K,V](config: P
   }
 
   /**
-   * Retrieves the partition id and throws an InvalidPartitionException if
+   * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
    * the value of partition is not between 0 and numPartitions-1
    * @param key the partition key
    * @param numPartitions the total number of available partitions
@@ -154,12 +153,12 @@ class DefaultEventHandler[K,V](config: P
    */
   private def getPartition(key: K, numPartitions: Int): Int = {
     if(numPartitions <= 0)
-      throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
+      throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
         "\n Valid values are > 0")
     val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
     else partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
-      throw new InvalidPartitionException("Invalid partition id : " + partition +
+      throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
         "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
     partition
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Aug 21 17:17:29 2012
@@ -18,32 +18,27 @@
 package kafka.server
 
 import java.io.IOException
-import java.util.concurrent.TimeUnit
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.common._
-import kafka.log._
 import kafka.message._
 import kafka.network._
+import kafka.utils.{Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
 import scala.math._
 import kafka.network.RequestChannel.Response
-import kafka.cluster.Replica
+import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.{Pool, ZkUtils, SystemTime, Logging}
-
+import org.I0Itec.zkclient.ZkClient
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
-                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper,
-                addReplicaCbk: (String, Int, Set[Int]) => Replica,
-                stopReplicaCbk: (String, Int) => Short,
-                becomeLeader: (Replica, LeaderAndISR) => Short,
-                becomeFollower: (Replica, LeaderAndISR) => Short,
+class KafkaApis(val requestChannel: RequestChannel,
+                val replicaManager: ReplicaManager,
+                val zkClient: ZkClient,
                 brokerId: Int) extends Logging {
 
   private val metricsGroup = brokerId.toString
@@ -70,52 +65,13 @@ class KafkaApis(val requestChannel: Requ
     }
   }
 
-
   def handleLeaderAndISRRequest(request: RequestChannel.Request){
-    val responseMap = new HashMap[(String, Int), Short]
     val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
     trace("Handling leader and isr request " + leaderAndISRRequest)
 
-    for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
-      var errorCode = ErrorMapping.NoError
-      val topic = partitionInfo._1
-      val partition = partitionInfo._2
-
-      // If the partition does not exist locally, create it
-      if(replicaManager.getPartition(topic, partition) == None){
-        trace("The partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition))
-        val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition)
-        trace("Assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString))
-        if(assignedReplicas.contains(brokerId)) {
-          val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
-          info("Starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
-        }
-      }
-      val replica = replicaManager.getReplica(topic, partition).get
-      // The command ask this broker to be new leader for P and it isn't the leader yet
-      val requestedLeaderId = leaderAndISR.leader
-      // If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id)
-      if(requestedLeaderId == brokerId && (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get != brokerId)){
-        info("Becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
-        errorCode = becomeLeader(replica, leaderAndISR)
-      }
-      else if (requestedLeaderId != brokerId) {
-        info("Becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
-        errorCode = becomeFollower(replica, leaderAndISR)
-      }
-
-      responseMap.put(partitionInfo, errorCode)
-    }
-
-    if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
-      replicaManager.startHighWaterMarksCheckPointThread
-      val partitionsToRemove = replicaManager.allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).keySet
-      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
-      partitionsToRemove.foreach(p => stopReplicaCbk(p._1, p._2))
-    }
-
+    val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest)
     val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
   }
@@ -129,9 +85,9 @@ class KafkaApis(val requestChannel: Requ
 
     val responseMap = new HashMap[(String, Int), Short]
 
-    for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
-      val errorCode = stopReplicaCbk(topic, partition)
-      responseMap.put((topic, partition), errorCode)
+    for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){
+      val errorCode = replicaManager.stopReplica(topic, partitionId)
+      responseMap.put((topic, partitionId), errorCode)
     }
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
@@ -225,10 +181,9 @@ class KafkaApis(val requestChannel: Requ
         BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
         BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
         try {
-          kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
-          val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
+          val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
+          val log = localReplica.log.get
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
-          replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset)
           offsets(msgIndex) = log.logEndOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace("%d bytes written to logs, nextAppendOffset = %d"
@@ -313,14 +268,14 @@ class KafkaApis(val requestChannel: Requ
     for(offsetDetail <- fetchRequest.offsetInfo) {
       for(i <- 0 until offsetDetail.partitions.size) {
         try {
-          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
-          val available = maybeLog match {
-            case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i))
+          val localReplica = replicaManager.getReplica(offsetDetail.topic, offsetDetail.partitions(i))
+          val available = localReplica match {
+            case Some(replica) => max(0, replica.log.get.logEndOffset - offsetDetail.offsets(i))
             case None => 0
           }
           totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
-          case e: InvalidPartitionException =>
+          case e: UnknownTopicOrPartitionException =>
             info("Invalid partition %d in fetch request from client %d."
               .format(offsetDetail.partitions(i), fetchRequest.clientId))
         }
@@ -337,8 +292,7 @@ class KafkaApis(val requestChannel: Requ
       val topic = offsetDetail.topic
       val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
       for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
-        replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset,
-                                              kafkaZookeeper.getZookeeperClient)
+        replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
       }
     }
   }
@@ -354,7 +308,7 @@ class KafkaApis(val requestChannel: Requ
       val info = new mutable.ArrayBuffer[PartitionData]()
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
-      for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ){
+      for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
         val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
           case Left(err) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
@@ -367,23 +321,14 @@ class KafkaApis(val requestChannel: Requ
           case Right(messages) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
             BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
-            val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId)
-            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d must exist on leader broker %d".format(topic, partition, brokerId))
-            val leaderReplica = leaderReplicaOpt.get
-            fetchRequest.replicaId match {
-              case FetchRequest.NonFollowerId =>
-               // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
-                new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
-              case _ => // fetch request from a follower
-                val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
-                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId))
-                val replica = replicaOpt.get
-                debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]"
-                  .format(replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
-                debug("Leader returning %d messages for topic %s partition %d to follower %d"
-                  .format(messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
-                new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
+            val leaderReplica = replicaManager.getReplica(topic, partition).get
+            if (fetchRequest.replicaId != FetchRequest.NonFollowerId) {
+              debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]"
+                .format(topic, partition, fetchRequest.replicaId))
+              debug("Leader returning %d messages for topic %s partition %d to follower %d"
+                .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
             }
+            new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages)
         }
         info.append(partitionInfo)
       }
@@ -399,10 +344,10 @@ class KafkaApis(val requestChannel: Requ
     var response: Either[Short, MessageSet] = null
     try {
       // check if the current broker is the leader for the partitions
-      kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
+      val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partition)
       trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-      val log = logManager.getLog(topic, partition)
-      response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
+      val log = localReplica.log.get
+      response = Right(log.read(offset, maxSize))
     } catch {
       case e =>
         error("error when processing request " + (topic, partition, offset, maxSize), e)
@@ -422,8 +367,9 @@ class KafkaApis(val requestChannel: Requ
 
     var response: OffsetResponse = null
     try {
-      kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
-      val offsets = logManager.getOffsets(offsetRequest)
+      // ensure leader exists
+      replicaManager.getLeaderReplicaIfLocal(offsetRequest.topic, offsetRequest.partition)
+      val offsets = replicaManager.logManager.getOffsets(offsetRequest)
       response = new OffsetResponse(offsetRequest.versionId, offsets)
     }catch {
       case ioe: IOException =>
@@ -446,10 +392,8 @@ class KafkaApis(val requestChannel: Requ
     trace("Handling topic metadata request " + metadataRequest.toString())
 
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
-    val zkClient = kafkaZookeeper.getZookeeperClient
     var errorCode = ErrorMapping.NoError
-    val config = logManager.config
-
+    val config = replicaManager.config
     try {
       val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
       metadataRequest.topics.zip(topicMetadataList).foreach(
@@ -457,7 +401,7 @@ class KafkaApis(val requestChannel: Requ
           val topic = topicAndMetadata._1
           topicAndMetadata._2.errorCode match {
             case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
-            case ErrorMapping.UnknownTopicCode =>
+            case ErrorMapping.UnknownTopicOrPartitionCode =>
               /* check if auto creation of topics is turned on */
               if(config.autoCreateTopics) {
                 CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
@@ -604,51 +548,22 @@ class KafkaApis(val requestChannel: Requ
       trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
         .format(topic, partitionId, fetchPartitionStatus.acksPending))
       if (fetchPartitionStatus.acksPending) {
-        val leaderReplica = replicaManager.getLeaderReplica(topic, partitionId)
-        leaderReplica match {
-          case Some(leader) => {
-            if (leader.isLocal) {
-              val isr = leader.partition.inSyncReplicas
-              val numAcks = isr.count(r => {
-                if (!r.isLocal) {
-                  r.logEndOffset() >= partitionStatus(key).requiredOffset
-                }
-                else
-                  true /* also count the local (leader) replica */
-              })
-
-              trace("Received %d/%d acks for producer request to %s-%d; isr size = %d".format(
-                numAcks, produce.requiredAcks,
-                topic, partitionId, isr.size))
-              if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
-                      (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
-                /*
-                 * requiredAcks < 0 means acknowledge after all replicas in ISR
-                 * are fully caught up to the (local) leader's offset
-                 * corresponding to this produce request.
-                 */
-
-                fetchPartitionStatus.acksPending = false
-                fetchPartitionStatus.error = ErrorMapping.NoError
-                val topicData =
-                  produce.data.find(_.topic == topic).get
-                val partitionData =
-                  topicData.partitionDataArray.find(_.partition == partitionId).get
-                delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
-                                                                       durationNs,
-                                                                       partitionData.sizeInBytes)
-                maybeUnblockDelayedFetchRequests(
-                  topic, Array(partitionData))
-              }
-            }
-            else {
-              debug("Broker not leader for %s-%d".format(topic, partitionId))
-              fetchPartitionStatus.setThisBrokerNotLeader()
-            }
-          }
-          case None =>
-            debug("Broker not leader for %s-%d".format(topic, partitionId))
-            fetchPartitionStatus.setThisBrokerNotLeader()
+        val partition = replicaManager.getOrCreatePartition(topic, partitionId)
+        val (hasEnough, errorCode) = partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
+        if (errorCode != ErrorMapping.NoError) {
+          fetchPartitionStatus.acksPending = false
+          fetchPartitionStatus.error = errorCode
+        } else if (hasEnough) {
+          fetchPartitionStatus.acksPending = false
+          fetchPartitionStatus.error = ErrorMapping.NoError
+        }
+        if (!fetchPartitionStatus.acksPending) {
+          val topicData = produce.data.find(_.topic == topic).get
+          val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get
+          delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
+                                                                 durationNs,
+                                                                 partitionData.sizeInBytes)
+          maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
         }
       }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Aug 21 17:17:29 2012
@@ -23,12 +23,8 @@ import kafka.log.LogManager
 import kafka.utils._
 import java.util.concurrent._
 import atomic.AtomicBoolean
-import kafka.cluster.Replica
-import kafka.api.LeaderAndISR
-import scala.collection._
 import org.I0Itec.zkclient.ZkClient
 
-
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
@@ -94,11 +90,10 @@ class KafkaServer(val config: KafkaConfi
 
     info("Connecting to ZK: " + config.zkConnect)
 
-    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, deleteLog)
+    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
 
     kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper,
-                         addReplica, stopReplica, makeLeader, makeFollower, config.brokerId)
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 
@@ -147,31 +142,6 @@ class KafkaServer(val config: KafkaConfi
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
-  def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
-    val log = logManager.getOrCreateLog(topic, partition)
-    replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
-  }
-
-  def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
-    replicaManager.makeLeader(replica, leaderAndISR)
-  }
-
-  def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
-    replicaManager.makeFollower(replica, leaderAndISR)
-  }
-
-  def getReplica(topic: String, partition: Int): Option[Replica] =
-    replicaManager.getReplica(topic, partition)
-
-  def stopReplica(topic: String, partition: Int): Short = {
-    replicaManager.stopReplica(topic, partition)
-  }
-
-  def deleteLog(topic: String,  partition: Int): Unit = {
-    /* TODO: handle deleteLog in a better way */
-    //logManager.deleteLog(topic, partition)
-  }
-
   def getLogManager(): LogManager = logManager
 
   def getStats(): SocketServerStats = socketServer.stats

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Aug 21 17:17:29 2012
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.net.InetAddress
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
@@ -25,8 +24,7 @@ import kafka.common._
 
 
 /**
- * Handles the server's interaction with zookeeper. The server needs to register the following paths:
- *   /topics/[topic]/[node_id-partition_num]
+ * Handles registering broker with zookeeper in the following path:
  *   /brokers/[0...N] --> host:port
  */
 class KafkaZooKeeper(config: KafkaConfig) extends Logging {
@@ -82,28 +80,6 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
-  private def doesTopicExistInCluster(topic: String) : Boolean = {
-    val allTopics = ZkUtils.getAllTopics(zkClient)
-    trace("all topics, %s, topic %s".format(allTopics, topic))
-    allTopics.contains(topic)
-  }
-
-  def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
-    if(!doesTopicExistInCluster(topic))
-      throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
-    // check if partition id is invalid
-    if(partition < 0)
-      throw new InvalidPartitionException("Partition %d is invalid".format(partition))
-    ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
-      case Some(leader) =>
-        if(leader != config.brokerId)
-          throw new LeaderNotAvailableException("Broker %d is not leader for partition %d for topic %s"
-            .format(config.brokerId, partition, topic))
-      case None =>
-        throw new LeaderNotAvailableException("There is no leader for topic %s partition %d".format(topic, partition))
-    }
-  }
-
   def getZookeeperClient = {
     zkClient
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Tue Aug 21 17:17:29 2012
@@ -33,15 +33,15 @@ class ReplicaFetcherThread(name:String, 
     val replica = replicaMgr.getReplica(topic, partitionId).get
     val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
 
-    if (fetchOffset != replica.logEndOffset())
-      throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset()))
+    if (fetchOffset != replica.logEndOffset)
+      throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset))
     trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId,
-      replica.logEndOffset(), messageSet.sizeInBytes, partitionData.hw))
+      replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw))
     replica.log.get.append(messageSet)
     trace("Follower %d has replica log end offset %d after appending %d messages"
-      .format(replica.brokerId, replica.logEndOffset(), messageSet.sizeInBytes))
-    val followerHighWatermark = replica.logEndOffset().min(partitionData.hw)
-    replica.highWatermark(Some(followerHighWatermark))
+      .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes))
+    val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
+    replica.highWatermark = followerHighWatermark
     trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
       .format(replica.brokerId, topic, partitionId, followerHighWatermark))
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1375670&r1=1375669&r2=1375670&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue Aug 21 17:17:29 2012
@@ -16,313 +16,205 @@
  */
 package kafka.server
 
-import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
 import collection._
-import mutable.ListBuffer
 import org.I0Itec.zkclient.ZkClient
-import java.util.concurrent.locks.ReentrantLock
-import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
-import kafka.api.LeaderAndISR
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{BrokerNotExistException, KafkaException, ErrorMapping, InvalidPartitionException}
+import kafka.utils._
+import kafka.log.LogManager
+import kafka.api.{LeaderAndISRRequest, LeaderAndISR}
+import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
 
+object ReplicaManager {
+  val UnknownLogEndOffset = -1L
+}
 
-class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, deleteLocalLog: (String, Int) => Unit) extends Logging {
-
-  var allPartitions = new mutable.HashMap[(String, Int), Partition]()
-  private var leaderReplicas = new ListBuffer[Partition]()
-  private val leaderReplicaLock = new ReentrantLock()
-  private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
-  this.logIdent = "Replica Manager on Broker " + config.brokerId + ", "
-
-  val hwCheckPointThreadStarted = new AtomicBoolean(false)
-  private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
-  info("Created highwatermark file %s".format(highwaterMarkCheckpoint.name))
+class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler,
+                     val logManager: LogManager) extends Logging {
+  private val allPartitions = new Pool[(String, Int), Partition]
+  private var leaderPartitions = new mutable.HashSet[Partition]()
+  private val leaderPartitionsLock = new Object
+  val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+  this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
+
+  private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
+  val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
+  info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
 
   def startHighWaterMarksCheckPointThread() = {
-    if(hwCheckPointThreadStarted.compareAndSet(false, true))
-      kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
+    if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
+      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
   }
 
   def startup() {
-    // start the highwatermark checkpoint thread
     // start ISR expiration thread
     kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
   }
 
-  def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
-    val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
-    var retReplica : Replica = null
-    val replicaOpt = partition.getReplica(config.brokerId)
-    replicaOpt match {
+  def stopReplica(topic: String, partitionId: Int): Short  = {
+    trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
+    val errorCode = ErrorMapping.NoError
+    getReplica(topic, partitionId) match {
       case Some(replica) =>
-        info("changing remote replica %s into a local replica".format(replica.toString))
-        replica.log match {
-          case None =>
-            replica.log = Some(log)
-          case Some(log) => // nothing to do since log already exists
+        replicaFetcherManager.removeFetcher(topic, partitionId)
+        /* TODO: handle deleteLog in a better way */
+        //logManager.deleteLog(topic, partition)
+        leaderPartitionsLock synchronized {
+          leaderPartitions -= replica.partition
         }
-        retReplica = replica
-      case None =>
-        val localReplica = new Replica(config.brokerId, partition, topic, time,
-                                       Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
-        partition.addReplica(localReplica)
-        info("adding local replica %d for topic %s partition %s on broker %d".format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
-        retReplica = localReplica
+        allPartitions.remove((topic, partitionId))
+        info("After removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
+      case None => //do nothing if replica no longer exists
     }
-    val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
-    partition.assignedReplicas(Some(assignedReplicas))
-    // get the replica objects for the assigned replicas for this partition
-    retReplica
+    trace("Finish handling stop replica [%s, %d]".format(topic, partitionId))
+    errorCode
   }
 
-  def stopReplica(topic: String, partition: Int): Short  = {
-    trace("handling stop replica for partition [%s, %d]".format(topic, partition))
-    val errorCode = ErrorMapping.NoError
-    val replica = getReplica(topic, partition)
-    if(replica.isDefined){
-      replicaFetcherManager.removeFetcher(topic, partition)
-      deleteLocalLog(topic, partition)
-      allPartitions.remove((topic, partition))
-      info("after removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partition, allPartitions))
+  def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
+    var partition = allPartitions.get((topic, partitionId))
+    if (partition == null) {
+      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
+      partition = allPartitions.get((topic, partitionId))
     }
-    trace("finishes handling stop replica [%s, %d]".format(topic, partition))
-    errorCode
+    partition
   }
 
-
-  def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
-    val newPartition = allPartitions.contains((topic, partitionId))
-    newPartition match {
-      case true => // partition exists, do nothing
-        allPartitions.get((topic, partitionId)).get
-      case false => // create remote replicas for each replica id in assignedReplicas
-        val partition = new Partition(topic, partitionId, time)
-        allPartitions += (topic, partitionId) -> partition
-        (assignedReplicaIds - config.brokerId).foreach(
-          replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
-        partition
-    }
+  def getPartition(topic: String, partitionId: Int): Option[Partition] = {
+    val partition = allPartitions.get((topic, partitionId))
+    if (partition == null)
+      None
+    else
+      Some(partition)
   }
 
-  def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
-    val partitionOpt = allPartitions.get((topic, partitionId))
+  def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica =  {
+    val partitionOpt = getPartition(topic, partitionId)
     partitionOpt match {
-      case Some(partition) => partition
       case None =>
-        throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d".format(topic, partitionId, config.brokerId))
+        throw new UnknownTopicOrPartitionException("Topic %s partition %d doesn't exist on %d".format(topic, partitionId, config.brokerId))
+      case Some(partition) =>
+        partition.leaderReplicaIfLocal match {
+          case Some(leaderReplica) => leaderReplica
+          case None =>
+            throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d"
+                    .format(topic, partitionId, config.brokerId))
+        }
     }
   }
 
-  def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
-    val remoteReplica = new Replica(replicaId, partition, topic, time)
-
-    val replicaAdded = partition.addReplica(remoteReplica)
-    if(replicaAdded)
-      info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId))
-    remoteReplica
+  def getOrCreateReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Replica =  {
+    getOrCreatePartition(topic, partitionId).getOrCreateReplica(replicaId)
   }
 
-  def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
-    val partitionOpt = allPartitions.get((topic, partitionId))
+  def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] =  {
+    val partitionOpt = getPartition(topic, partitionId)
     partitionOpt match {
-      case Some(partition) =>
-        partition.getReplica(replicaId)
-      case None =>
-        None
+      case None => None
+      case Some(partition) => partition.getReplica(replicaId)
     }
   }
 
-  def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
-    val replicasOpt = allPartitions.get((topic, partitionId))
-    replicasOpt match {
-      case Some(replicas) =>
-        Some(replicas.leaderReplica())
-      case None =>
-        throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
-                                         "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndISRRequest): collection.Map[(String, Int), Short] = {
+    info("Handling leader and isr request %s".format(leaderAndISRRequest))
+    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+
+    for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
+      var errorCode = ErrorMapping.NoError
+      val topic = partitionInfo._1
+      val partitionId = partitionInfo._2
+
+      val requestedLeaderId = leaderAndISR.leader
+      try {
+        if(requestedLeaderId == config.brokerId)
+          makeLeader(topic, partitionId, leaderAndISR)
+        else
+          makeFollower(topic, partitionId, leaderAndISR)
+      } catch {
+        case e =>
+          error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
+          errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+      }
+      responseMap.put(partitionInfo, errorCode)
     }
-  }
 
-  def getPartition(topic: String, partitionId: Int): Option[Partition] =
-    allPartitions.get((topic, partitionId))
+    /**
+     *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
+     *  as deleted.
+     */
+    if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
+      startHighWaterMarksCheckPointThread
+      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
+      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
+      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
+    }
+
+    responseMap
+  }
 
-  private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
-    // set the replica leo
-    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
-    partition.updateReplicaLeo(replica, fetchOffset)
-  }
-
-  private def maybeIncrementLeaderHW(replica: Replica) {
-    // set the replica leo
-    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
-    // set the leader HW to min of the leo of all replicas
-    val allLeos = partition.inSyncReplicas.map(_.logEndOffset())
-    val newHw = allLeos.min
-    val oldHw = partition.leaderHW()
-    if(newHw > oldHw) {
-      partition.leaderHW(Some(newHw))
-    }else
-      debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
-                                                                                            replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
-  }
-
-  def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
-    info("becoming Leader for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
-    info("started the leader state transition for topic %s partition %d"
-                 .format(replica.topic, replica.partition.partitionId))
-    try {
-      // read and cache the ISR
-      replica.partition.leaderId(Some(replica.brokerId))
-      replica.partition.updateISR(leaderAndISR.ISR.toSet)
-      // stop replica fetcher thread, if any
-      replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
+  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
+    info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
+    val partition = getOrCreatePartition(topic, partitionId)
+    if (partition.makeLeader(topic, partitionId, leaderAndISR)) {
       // also add this partition to the list of partitions for which the leader is the current broker
-      leaderReplicaLock.lock()
-      leaderReplicas += replica.partition
-      info("completed the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId))
-      ErrorMapping.NoError
-    }catch {
-      case e => error("failed to complete the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId), e)
-      ErrorMapping.UnknownCode
-      /* TODO: add specific error code */
-    }finally {
-      leaderReplicaLock.unlock()
+      leaderPartitionsLock synchronized {
+        leaderPartitions += partition
+      } 
     }
+    info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
   }
 
-
-  def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
     val leaderBrokerId: Int = leaderAndISR.leader
-    info("starting the follower state transition to follow leader %d for topic %s partition %d"
-                 .format(leaderBrokerId, replica.topic, replica.partition.partitionId))
-    try {
-      // set the leader for this partition correctly on this broker
-      replica.partition.leaderId(Some(leaderBrokerId))
-      replica.log match {
-        case Some(log) =>  // log is already started
-          log.truncateTo(replica.highWatermark())
-        case None =>
-      }
-      debug("for partition [%s, %d], the leaderBroker is [%d]".format(replica.topic, replica.partition.partitionId, leaderAndISR.leader))
-      // get leader for this replica
-      val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
-      val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
-      // become follower only if it is not already following the same leader
-      if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
-        info("becoming follower to leader %d for topic %s partition %d".format(leaderBrokerId, replica.topic, replica.partition.partitionId))
-        // stop fetcher thread to previous leader
-        replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
-        // start fetcher thread to current leader
-        replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
-      }
+    info("Starting the follower state transition to follow leader %d for topic %s partition %d"
+                 .format(leaderBrokerId, topic, partitionId))
+
+    val partition = getOrCreatePartition(topic, partitionId)
+    if (partition.makeFollower(topic, partitionId, leaderAndISR)) {
       // remove this replica's partition from the ISR expiration queue
-      leaderReplicaLock.lock()
-      leaderReplicas -= replica.partition
-      info("completed the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId))
-      ErrorMapping.NoError
-    } catch {
-      case e: BrokerNotExistException =>
-        error("failed to complete the follower state transition to follow leader %d for topic %s partition %d because the leader broker does not exist in the cluster".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
-        ErrorMapping.BrokerNotExistInZookeeperCode
-      case e =>
-        error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
-        ErrorMapping.UnknownCode
-    }finally {
-      leaderReplicaLock.unlock()
+      leaderPartitionsLock synchronized {
+        leaderPartitions -= partition
+      }
     }
   }
 
   private def maybeShrinkISR(): Unit = {
-    try {
-      info("evaluating ISR list of partitions to see which replicas can be removed from the ISR")
-      leaderReplicaLock.lock()
-      leaderReplicas.foreach(partition => {
-        val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
-        if(outOfSyncReplicas.size > 0) {
-          val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
-          assert(newInSyncReplicas.size > 0)
-          info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
-          // update ISR in zk and in memory
-          partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
-        }
-      })
-    }catch {
-      case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
-    }finally {
-      leaderReplicaLock.unlock()
-    }
-  }
-
-  private def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
-    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
-    if(partition.inSyncReplicas.contains(replica)) false
-    else if(partition.assignedReplicas().contains(replica)) {
-      val leaderHW = partition.leaderHW()
-      replica.logEndOffset() >= leaderHW
-    }
-    else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
-  }
-
-  def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
-    val replicaOpt = getReplica(topic, partition, replicaId)
-    replicaOpt match {
-      case Some(replica) =>
-        updateReplicaLeo(replica, offset)
-        // check if this replica needs to be added to the ISR
-        if(checkIfISRCanBeExpanded(replica)) {
-          val newISR = replica.partition.inSyncReplicas + replica
-          // update ISR in ZK and cache
-          replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient))
-        }
-        debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partition))
-        maybeIncrementLeaderHW(replica)
-      case None =>
-        throw new KafkaException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
+    trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
+    leaderPartitionsLock synchronized {
+      leaderPartitions.foreach(partition => partition.maybeShrinkISR(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
     }
   }
 
-  def recordLeaderLogEndOffset(topic: String, partition: Int, logEndOffset: Long) = {
-    val replicaOpt = getReplica(topic, partition, config.brokerId)
-    replicaOpt match {
-      case Some(replica) => replica.logEndOffset(Some(logEndOffset))
-      case None =>
-        throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
-    }
+  def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
+    val partition = getOrCreatePartition(topic, partitionId)
+    partition.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
   }
 
   /**
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */
-  def checkpointHighwaterMarks() {
-    val highwaterMarksForAllPartitions = allPartitions.map
-            { partition =>
-              val topic = partition._1._1
-              val partitionId = partition._1._2
-              val localReplicaOpt = partition._2.getReplica(config.brokerId)
-              val hw = localReplicaOpt match {
-                case Some(localReplica) => localReplica.highWatermark()
-                case None =>
-                  error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + " Replica metadata doesn't exist")
-                  0L
-              }
-              (topic, partitionId) -> hw
-            }.toMap
-    highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
-    info("Checkpointed high watermark data: %s".format(highwaterMarksForAllPartitions))
+  def checkpointHighWatermarks() {
+    val highWaterarksForAllPartitions = allPartitions.map {
+      partition =>
+        val topic = partition._1._1
+        val partitionId = partition._1._2
+        val localReplicaOpt = partition._2.getReplica(config.brokerId)
+        val hw = localReplicaOpt match {
+          case Some(localReplica) => localReplica.highWatermark
+          case None =>
+            error("Highwatermark for topic %s partition %d doesn't exist during checkpointing"
+                  .format(topic, partitionId))
+             0L
+        }
+        (topic, partitionId) -> hw
+    }.toMap
+    highWatermarkCheckpoint.write(highWaterarksForAllPartitions)
+    trace("Checkpointed high watermark data: %s".format(highWaterarksForAllPartitions))
   }
 
-  /**
-   * Reads the checkpointed highWatermarks for all partitions
-   * @return checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0
-   */
-  def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
-
   def shutdown() {
-    info("shut down")
+    info("Shut down")
     replicaFetcherManager.shutdown()
-    checkpointHighwaterMarks()
-    info("shuttedd down completely")
+    checkpointHighWatermarks()
+    info("Shutted down completely")
   }
 }



Mime
View raw message