kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1397372 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/controller/ main/scala/kafka/server/ test/scala/unit/kafka/api/ test/scala/unit/kafka/server/
Date Thu, 11 Oct 2012 23:30:23 GMT
Author: nehanarkhede
Date: Thu Oct 11 23:30:23 2012
New Revision: 1397372

URL: http://svn.apache.org/viewvc?rev=1397372&view=rev
Log:
KAFKA-510 Broker needs to know replication factor per partition; patched by Yang Ye; reviewed
by Neha Narkhede, Jun Rao and Joel Koshy

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
Thu Oct 11 23:30:23 2012
@@ -44,7 +44,7 @@ object LeaderAndISRResponse {
 case class LeaderAndISRResponse(versionId: Short,
                                 responseMap: Map[(String, Int), Short],
                                 errorCode: Short = ErrorMapping.NoError)
-        extends RequestOrResponse{
+        extends RequestOrResponse {
   def sizeInBytes(): Int ={
     var size = 2 + 2 + 4
     for ((key, value) <- responseMap){

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Thu
Oct 11 23:30:23 2012
@@ -27,38 +27,46 @@ import collection.mutable.HashMap
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0
-  def readFrom(buffer: ByteBuffer): LeaderAndIsr = {
+}
+
+case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion:
Int) {
+  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR,
LeaderAndIsr.initialZKVersion)
+
+  override def toString(): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
+    jsonDataMap.put("ISR", isr.mkString(","))
+    Utils.stringMapToJsonString(jsonDataMap)
+  }
+}
+
+
+object PartitionStateInfo {
+  def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
     val leader = buffer.getInt
     val leaderGenId = buffer.getInt
     val ISRString = Utils.readShortString(buffer, "UTF-8")
     val ISR = ISRString.split(",").map(_.toInt).toList
     val zkVersion = buffer.getInt
-    new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion)
+    val replicationFactor = buffer.getInt
+    PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, ISR, zkVersion), replicationFactor)
   }
 }
 
-case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion:
Int){
-  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR,
LeaderAndIsr.initialZKVersion)
-
+case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFactor: Int)
{
   def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(leader)
-    buffer.putInt(leaderEpoch)
-    Utils.writeShortString(buffer, isr.mkString(","), "UTF-8")
-    buffer.putInt(zkVersion)
+    buffer.putInt(leaderAndIsr.leader)
+    buffer.putInt(leaderAndIsr.leaderEpoch)
+    Utils.writeShortString(buffer, leaderAndIsr.isr.mkString(","), "UTF-8")
+    buffer.putInt(leaderAndIsr.zkVersion)
+    buffer.putInt(replicationFactor)
   }
 
   def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + isr.mkString(",").length) + 4
+    val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4
     size
   }
-
-  override def toString(): String = {
-    val jsonDataMap = new HashMap[String, String]
-    jsonDataMap.put("leader", leader.toString)
-    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
-    jsonDataMap.put("ISR", isr.mkString(","))
-    Utils.stringMapToJsonString(jsonDataMap)
-  }
 }
 
 
@@ -73,17 +81,17 @@ object LeaderAndIsrRequest {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
-    val leaderAndISRRequestCount = buffer.getInt
-    val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
+    val partitionStateInfosCount = buffer.getInt
+    val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
 
-    for(i <- 0 until leaderAndISRRequestCount){
+    for(i <- 0 until partitionStateInfosCount){
       val topic = Utils.readShortString(buffer, "UTF-8")
       val partition = buffer.getInt
-      val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer)
+      val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
 
-      leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
+      partitionStateInfos.put((topic, partition), partitionStateInfo)
     }
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos)
   }
 }
 
@@ -91,19 +99,19 @@ object LeaderAndIsrRequest {
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
                                 ackTimeoutMs: Int,
-                                leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
+                                partitionStateInfos: Map[(String, Int), PartitionStateInfo])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
leaderAndISRInfos)
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo]) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionStateInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
-    buffer.putInt(leaderAndISRInfos.size)
-    for((key, value) <- leaderAndISRInfos){
+    buffer.putInt(partitionStateInfos.size)
+    for((key, value) <- partitionStateInfos){
       Utils.writeShortString(buffer, key._1, "UTF-8")
       buffer.putInt(key._2)
       value.writeTo(buffer)
@@ -112,7 +120,7 @@ case class LeaderAndIsrRequest (versionI
 
   def sizeInBytes(): Int = {
     var size = 1 + 2 + (2 + clientId.length) + 4 + 4
-    for((key, value) <- leaderAndISRInfos)
+    for((key, value) <- partitionStateInfos)
       size += (2 + key._1.length) + 4 + value.sizeInBytes
     size
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Thu Oct
11 23:30:23 2012
@@ -21,15 +21,17 @@ import kafka.utils._
 import java.lang.Object
 import kafka.api.LeaderAndIsr
 import kafka.server.ReplicaManager
-import kafka.common.ErrorMapping
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import kafka.common.ErrorMapping
+
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR,
RAR
  */
 class Partition(val topic: String,
                 val partitionId: Int,
+                var replicationFactor: Int,
                 time: Time,
                 val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup
{
   private val localBrokerId = replicaManager.config.brokerId
@@ -57,8 +59,7 @@ class Partition(val topic: String,
   )
 
   def isUnderReplicated(): Boolean = {
-    // TODO: need to pass in replication factor from controller
-    inSyncReplicas.size < replicaManager.config.defaultReplicationFactor
+    inSyncReplicas.size < replicationFactor
   }
 
   def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
@@ -292,7 +293,7 @@ class Partition(val topic: String,
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",
")))
     val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r =>
r.brokerId).toList, zkVersion)
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString,
zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString(),
zkVersion)
     if (updateSucceeded){
       inSyncReplicas = newISR
       zkVersion = newVersion

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Thu Oct 11
23:30:23 2012
@@ -67,7 +67,7 @@ class Replica(val brokerId: Int,
   def highWatermark_=(newHighWatermark: Long) {
     if (isLocal) {
       trace("Setting hw for replica %d topic %s partition %d on broker %d to %d"
-              .format(brokerId, topic, partitionId, newHighWatermark))
+              .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
       highWatermarkValue.set(newHighWatermark)
     } else
       throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition
%d since it's not local"

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Thu Oct 11 23:30:23 2012
@@ -139,7 +139,7 @@ class RequestSendThread(val controllerId
 
 class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse)
=> Unit) => Unit)
   extends  Logging {
-  val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+  val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
 
   def newBatch() {
@@ -151,10 +151,12 @@ class ControllerBrokerRequestBatch(sendR
     stopReplicaRequestMap.clear()
   }
 
-  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderAndIsr: LeaderAndIsr) {
+  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderAndIsr: LeaderAndIsr, replicationFactor: Int) {
     brokerIds.foreach { brokerId =>
-      leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int),
LeaderAndIsr])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition), leaderAndIsr)
+      leaderAndIsrRequestMap.getOrElseUpdate(brokerId,
+                                             new mutable.HashMap[(String, Int), PartitionStateInfo])
+      leaderAndIsrRequestMap(brokerId).put((topic, partition),
+                                           PartitionStateInfo(leaderAndIsr, replicationFactor))
     }
   }
 
@@ -168,8 +170,8 @@ class ControllerBrokerRequestBatch(sendR
   def sendRequestsToBrokers() {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
-      val leaderAndIsr = m._2
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
+      val partitionStateInfos = m._2
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos)
       debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
Thu Oct 11 23:30:23 2012
@@ -234,14 +234,14 @@ class PartitionStateMachine(controller: 
         debug("Live assigned replicas for partition [%s, %d] are: [%s]".format(topic, partition,
liveAssignedReplicas))
         // make the first replica in the list of assigned replicas, the leader
         val leader = liveAssignedReplicas.head
-        var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
+        val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString())
           // NOTE: the above write can fail only if the current controller lost its zk session
and the new controller
           // took over and initialized this partition. This can happen if the current controller
went into a long
           // GC pause
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic,
partition, leaderAndIsr)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic,
partition, leaderAndIsr, replicaAssignment.size)
           controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
           partitionState.put((topic, partition), OnlinePartition)
         }catch {
@@ -283,7 +283,8 @@ class PartitionStateMachine(controller: 
       controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
       info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader,
topic, partition))
       // store new leader and isr info in cache
-      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic,
partition, newLeaderAndIsr)
+      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition,
+                                                          topic, partition, newLeaderAndIsr,
controllerContext.partitionReplicaAssignment((topic, partition)).size)
     }catch {
       case poe: PartitionOfflineException => throw new PartitionOfflineException("All
replicas for partition [%s, %d] are dead."
         .format(topic, partition) + " Marking this partition offline", poe)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
Thu Oct 11 23:30:23 2012
@@ -43,7 +43,7 @@ class ReplicaStateMachine(controller: Ka
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
-  private var isShuttingDown = new AtomicBoolean(false)
+  private val isShuttingDown = new AtomicBoolean(false)
 
   /**
    * Invoked on successful controller election. First registers a broker change listener
since that triggers all
@@ -101,6 +101,7 @@ class ReplicaStateMachine(controller: Ka
   def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState)
{
     try {
       replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica)
+      val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
       targetState match {
         case NewReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica),
targetState)
@@ -111,7 +112,8 @@ class ReplicaStateMachine(controller: Ka
               if(leaderAndIsr.leader == replicaId)
                 throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot
be moved to NewReplica"
                   .format(replicaId, topic, partition) + "state as it is being requested
to become leader")
-              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic,
partition, leaderAndIsr)
+              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
+                                                                  topic, partition, leaderAndIsr,
replicaAssignment.size)
             case None => // new leader request will be sent to this replica when one gets
elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
@@ -143,7 +145,8 @@ class ReplicaStateMachine(controller: Ka
                 case Some(leaderAndIsr) =>
                   controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
                     case true => // leader is alive
-                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderAndIsr)
+                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
+                                                                          topic, partition,
leaderAndIsr, replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OnlineReplica)
                       info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId,
topic, partition))
                     case false => // ignore partitions whose leader is not alive
@@ -167,7 +170,7 @@ class ReplicaStateMachine(controller: Ka
                 info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition,
newLeaderAndIsr.toString()))
                 // update the new leadership decision in zookeeper or retry
                 val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString,
+                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
                   leaderAndIsr.zkVersion)
                 newLeaderAndIsr.zkVersion = newVersion
                 zookeeperPathUpdateSucceeded = updateSucceeded
@@ -176,7 +179,8 @@ class ReplicaStateMachine(controller: Ka
             }
           }
           // send the shrunk ISR state change request only to the leader
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader),
topic, partition, newLeaderAndIsr)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader),
+                                                              topic, partition, newLeaderAndIsr,
replicaAssignment.size)
           // update the local leader and isr cache
           controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
           replicaState.put((topic, partition, replicaId), OfflineReplica)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Oct
11 23:30:23 2012
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.io.IOException
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.message._
@@ -26,7 +25,6 @@ import kafka.utils.{Pool, SystemTime, Lo
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
-import scala.math._
 import kafka.network.RequestChannel.Response
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -34,6 +32,7 @@ import kafka.metrics.KafkaMetricsGroup
 import org.I0Itec.zkclient.ZkClient
 import kafka.common._
 
+
 /**
  * Logic to handle the various Kafka requests
  */
@@ -132,10 +131,14 @@ class KafkaApis(val requestChannel: Requ
     produceRequest.data.foreach(partitionAndData =>
       maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
 
+    val allPartitionHaveReplicationFactorOne =
+      !produceRequest.data.keySet.exists(
+        m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) !=
1)
     if (produceRequest.requiredAcks == 0 ||
         produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
-        numPartitionsInError == produceRequest.numPartitions) {
+        allPartitionHaveReplicationFactorOne ||
+        numPartitionsInError == produceRequest.numPartitions){
       val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode,
r.start)).toMap
       val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId,
statuses)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
@@ -517,8 +520,13 @@ class KafkaApis(val requestChannel: Requ
       trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
         .format(topic, partitionId, fetchPartitionStatus.acksPending))
       if (fetchPartitionStatus.acksPending) {
-        val partition = replicaManager.getOrCreatePartition(topic, partitionId)
-        val (hasEnough, errorCode) = partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset,
produce.requiredAcks)
+        val partitionOpt = replicaManager.getPartition(topic, partitionId)
+        val (hasEnough, errorCode) = partitionOpt match {
+          case Some(partition) =>
+            partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset,
produce.requiredAcks)
+          case None =>
+            (false, ErrorMapping.UnknownTopicOrPartitionCode)
+        }
         if (errorCode != ErrorMapping.NoError) {
           fetchPartitionStatus.acksPending = false
           fetchPartitionStatus.error = errorCode

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Thu
Oct 11 23:30:23 2012
@@ -22,11 +22,12 @@ import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
 import kafka.log.LogManager
-import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
-import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
+import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
+import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
+
 
 object ReplicaManager {
   val UnknownLogEndOffset = -1L
@@ -39,7 +40,6 @@ class ReplicaManager(val config: KafkaCo
   private val leaderPartitionsLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
-
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
   info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
@@ -69,6 +69,20 @@ class ReplicaManager(val config: KafkaCo
       kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread",
0, config.defaultFlushIntervalMs)
   }
 
+  /**
+   * This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest().
+   * In the former case, the partition should have been created, in the latter case, return
-1 will put the request into purgatory
+   */
+  def getReplicationFactorForPartition(topic: String, partitionId: Int) = {
+    val partitionOpt = getPartition(topic, partitionId)
+    partitionOpt match {
+      case Some(partition) =>
+        partition.replicationFactor
+      case None =>
+        -1
+    }
+  }
+
   def startup() {
     // start ISR expiration thread
     kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
@@ -93,10 +107,10 @@ class ReplicaManager(val config: KafkaCo
     errorCode
   }
 
-  def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
+  def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition
= {
     var partition = allPartitions.get((topic, partitionId))
     if (partition == null) {
-      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId,
time, this))
+      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId,
replicationFactor, time, this))
       partition = allPartitions.get((topic, partitionId))
     }
     partition
@@ -125,10 +139,6 @@ class ReplicaManager(val config: KafkaCo
     }
   }
 
-  def getOrCreateReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId):
Replica =  {
-    getOrCreatePartition(topic, partitionId).getOrCreateReplica(replicaId)
-  }
-
   def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica]
=  {
     val partitionOpt = getPartition(topic, partitionId)
     partitionOpt match {
@@ -141,23 +151,23 @@ class ReplicaManager(val config: KafkaCo
     info("Handling leader and isr request %s".format(leaderAndISRRequest))
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
 
-    for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
+    for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){
       var errorCode = ErrorMapping.NoError
-      val topic = partitionInfo._1
-      val partitionId = partitionInfo._2
+      val topic = topicAndPartition._1
+      val partitionId = topicAndPartition._2
 
-      val requestedLeaderId = leaderAndISR.leader
+      val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
       try {
         if(requestedLeaderId == config.brokerId)
-          makeLeader(topic, partitionId, leaderAndISR)
+          makeLeader(topic, partitionId, partitionStateInfo)
         else
-          makeFollower(topic, partitionId, leaderAndISR)
+          makeFollower(topic, partitionId, partitionStateInfo)
       } catch {
         case e =>
           error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
           errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
       }
-      responseMap.put(partitionInfo, errorCode)
+      responseMap.put(topicAndPartition, errorCode)
     }
 
     /**
@@ -167,7 +177,7 @@ class ReplicaManager(val config: KafkaCo
      */
 //    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
 //      startHighWaterMarksCheckPointThread
-//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry
=> entry._1)
+//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry
=> entry._1)
 //      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
 //      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
 //    }
@@ -175,10 +185,11 @@ class ReplicaManager(val config: KafkaCo
     responseMap
   }
 
-  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
+  private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo)
= {
+    val leaderAndIsr = partitionStateInfo.leaderAndIsr
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
-    val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
+    val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, true)) {
       // also add this partition to the list of partitions for which the leader is the current
broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -187,13 +198,14 @@ class ReplicaManager(val config: KafkaCo
     info("Completed the leader state transition for topic %s partition %d".format(topic,
partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
-    val leaderBrokerId: Int = leaderAndISR.leader
+  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo)
{
+    val leaderAndIsr = partitionStateInfo.leaderAndIsr
+    val leaderBrokerId: Int = leaderAndIsr.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition
%d"
                  .format(leaderBrokerId, topic, partitionId))
 
-    val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) {
+    val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, false)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
@@ -209,8 +221,12 @@ class ReplicaManager(val config: KafkaCo
   }
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long)
= {
-    val partition = getOrCreatePartition(topic, partitionId)
-    partition.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
+    val partitionOpt = getPartition(topic, partitionId)
+    if(partitionOpt.isDefined){
+      partitionOpt.get.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
+    } else {
+      warn("While recording the follower position, the partition [%s, %d] hasn't been created,
skip updating leader HW".format(topic, partitionId))
+    }
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
Thu Oct 11 23:30:23 2012
@@ -21,7 +21,6 @@ import org.junit._
 import org.scalatest.junit.JUnitSuite
 import junit.framework.Assert._
 import java.nio.ByteBuffer
-import kafka.api._
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.cluster.Broker
 import collection.mutable._
@@ -83,8 +82,8 @@ object SerializationTestUtils{
   def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = {
     val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1)
     val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
-    val map = Map(((topic1, 0), leaderAndISR1),
-                  ((topic2, 0), leaderAndISR2))
+    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndISR1, 3)),
+                  ((topic2, 0), PartitionStateInfo(leaderAndISR2, 3)))
     new LeaderAndIsrRequest(map)
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Thu Oct 11 23:30:23 2012
@@ -45,7 +45,7 @@ class HighwatermarkPersistenceTest exten
     replicaManager.checkpointHighWatermarks()
     var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
     assertEquals(0L, fooPartition0Hw)
-    val partition0 = replicaManager.getOrCreatePartition(topic, 0)
+    val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
     // create leader log
     val log0 = getMockLog
     // create leader and follower replicas
@@ -86,7 +86,7 @@ class HighwatermarkPersistenceTest exten
     replicaManager.checkpointHighWatermarks()
     var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
     assertEquals(0L, topic1Partition0Hw)
-    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
+    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
     // create leader log
     val topic1Log0 = getMockLog
     // create a local replica for topic1
@@ -102,7 +102,7 @@ class HighwatermarkPersistenceTest exten
     assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
     assertEquals(5L, topic1Partition0Hw)
     // add another partition and set highwatermark
-    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
+    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
     // create leader log
     val topic2Log0 = getMockLog
     // create a local replica for topic2

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
Thu Oct 11 23:30:23 2012
@@ -81,7 +81,7 @@ class ISRExpirationTest extends JUnit3Su
                                                localLog: Log): Partition = {
     val leaderId=config.brokerId
     val replicaManager = new ReplicaManager(config, time, null, null, null)
-    val partition = replicaManager.getOrCreatePartition(topic, partitionId)
+    val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1397372&r1=1397371&r2=1397372&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
Thu Oct 11 23:30:23 2012
@@ -237,7 +237,7 @@ class SimpleFetchTest extends JUnit3Suit
 
   private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time,
leaderId: Int,
                                                localLog: Log, leaderHW: Long, replicaManager:
ReplicaManager): Partition = {
-    val partition = new Partition(topic, partitionId, time, replicaManager)
+    val partition = new Partition(topic, partitionId, 2, time, replicaManager)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica



Mime
View raw message