kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Preserve the assignment order from the LeaderAndIsr request (#7010)
Date Wed, 03 Jul 2019 03:27:42 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ea6cf41  MINOR: Preserve the assignment order from the LeaderAndIsr request (#7010)
ea6cf41 is described below

commit ea6cf41eb5135ec75f976ca769a16984c04a9c07
Author: José Armando García Sancio <jsancio@users.noreply.github.com>
AuthorDate: Tue Jul 2 20:27:22 2019 -0700

    MINOR: Preserve the assignment order from the LeaderAndIsr request (#7010)
    
    Leaders should make changes to the assignment and the ISR at the same time as part of processing the LeaderAndIsr requests. The leader should also preserve the order of assignment mainly for consistency with the Controller's code and data representation.
    
    Reviewers: Vikas Singh, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 166 ++++++++++-----------
 .../main/scala/kafka/server/MetadataCache.scala    |   4 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   6 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 127 +++++++++-------
 .../server/HighwatermarkPersistenceTest.scala      |   9 +-
 ...xpirationTest.scala => IsrExpirationTest.scala} |  25 ++--
 .../unit/kafka/server/LogDirFailureTest.scala      |   2 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |   2 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  14 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  13 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  20 +--
 ...chDrivenReplicationProtocolAcceptanceTest.scala |   2 +-
 12 files changed, 204 insertions(+), 186 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 160c1d0..9ffce13 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,10 +16,9 @@
  */
 package kafka.cluster
 
-import java.util.{Optional, Properties}
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import com.yammer.metrics.core.Gauge
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.{Optional, Properties}
 import kafka.api.{ApiVersion, LeaderAndIsr, Request}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
@@ -39,7 +38,6 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
-
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq}
 
@@ -158,14 +156,14 @@ object Partition extends KafkaMetricsGroup {
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
  */
 class Partition(val topicPartition: TopicPartition,
-                private val replicaLagTimeMaxMs: Long,
-                private val interBrokerProtocolVersion: ApiVersion,
-                private val localBrokerId: Int,
-                private val time: Time,
-                private val stateStore: PartitionStateStore,
-                private val delayedOperations: DelayedOperations,
-                private val metadataCache: MetadataCache,
-                private val logManager: LogManager) extends Logging with KafkaMetricsGroup {
+                replicaLagTimeMaxMs: Long,
+                interBrokerProtocolVersion: ApiVersion,
+                localBrokerId: Int,
+                time: Time,
+                stateStore: PartitionStateStore,
+                delayedOperations: DelayedOperations,
+                metadataCache: MetadataCache,
+                logManager: LogManager) extends Logging with KafkaMetricsGroup {
 
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
@@ -179,10 +177,9 @@ class Partition(val topicPartition: TopicPartition,
   // defined when this broker is leader for partition
   @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
   @volatile var leaderReplicaIdOpt: Option[Int] = None
-  @volatile var inSyncReplicas: Set[Int] = Set.empty[Int]
-  // Includes all valid broker ids (@see Request::isValidBrokerId) that contain logs. Doesn't contain
-  // future log replica id
-  @volatile var allReplicaIds: scala.collection.mutable.Set[Int] = scala.collection.mutable.Set(localBrokerId)
+  @volatile var inSyncReplicaIds = Set.empty[Int]
+  // An ordered sequence of all the valid broker ids that were assigned to this topic partition
+  @volatile var allReplicaIds = Seq.empty[Int]
 
   // Logs belonging to this partition. Majority of time it will be only one log, but if log directory
   // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
@@ -214,7 +211,7 @@ class Partition(val topicPartition: TopicPartition,
   newGauge("InSyncReplicasCount",
     new Gauge[Int] {
       def value: Int = {
-        if (isLeader) inSyncReplicas.size else 0
+        if (isLeader) inSyncReplicaIds.size else 0
       }
     },
     tags
@@ -257,14 +254,14 @@ class Partition(val topicPartition: TopicPartition,
   )
 
   def isUnderReplicated: Boolean =
-    isLeader && inSyncReplicas.size < allReplicaIds.size
+    isLeader && inSyncReplicaIds.size < allReplicaIds.size
 
   def isUnderMinIsr: Boolean = {
-    leaderLogIfLocal.exists { inSyncReplicas.size < _.config.minInSyncReplicas }
+    leaderLogIfLocal.exists { inSyncReplicaIds.size < _.config.minInSyncReplicas }
   }
 
   def isAtMinIsr: Boolean = {
-    leaderLogIfLocal.exists { inSyncReplicas.size == _.config.minInSyncReplicas }
+    leaderLogIfLocal.exists { inSyncReplicaIds.size == _.config.minInSyncReplicas }
   }
 
   /**
@@ -301,19 +298,6 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  /**
-   * Creates a remote replica and puts that in a map. A future invocation to create
-   * replica with same id will return previously created object.
-   */
-  def getOrCreateReplica(replicaId: Int): Replica = {
-    require(replicaId != localBrokerId, s"Cannot create replica for local broker: $replicaId")
-    val newReplica = remoteReplicasMap.getAndMaybePut(replicaId, new Replica(replicaId, topicPartition))
-    allReplicaIds.add(replicaId)
-    require(remoteReplicasMap.size + 1 == allReplicaIds.size,
-      s"Invalid state. All Replica Ids: $allReplicaIds, remote replica ids: ${remoteReplicasMap.keys}")
-    newReplica
-  }
-
   def createLogIfNotExists(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
     isFutureReplica match {
       case true if futureLog.isEmpty =>
@@ -414,12 +398,6 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  // Visible for testing -- Only used in tests to add replica to existing partition
-  def addReplicaIfNotExists(replica: Replica): Replica = {
-    allReplicaIds.add(replica.brokerId)
-    remoteReplicasMap.getAndMaybePut(replica.brokerId, replica)
-  }
-
   // Visible for testing -- Used by unit tests to set log for this partition
   def setLog(log: Log, isFutureLog: Boolean): Unit = {
     if (isFutureLog)
@@ -431,12 +409,6 @@ class Partition(val topicPartition: TopicPartition,
   def remoteReplicas: Set[Replica] =
     remoteReplicasMap.values.toSet
 
-  private def removeReplica(replicaId: Int) {
-    require(replicaId != localBrokerId, s"Cannot remove replica for local broker: $replicaId")
-    allReplicaIds.remove(replicaId)
-    remoteReplicasMap.remove(replicaId)
-  }
-
   def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
     inReadLock(leaderIsrUpdateLock) {
       futureLog.exists(_.dir.getParent != newDestinationDir)
@@ -484,10 +456,10 @@ class Partition(val topicPartition: TopicPartition,
     // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
     inWriteLock(leaderIsrUpdateLock) {
       remoteReplicasMap.clear()
-      allReplicaIds = scala.collection.mutable.Set(localBrokerId)
+      allReplicaIds = Seq.empty
       log = None
       futureLog = None
-      inSyncReplicas = Set.empty[Int]
+      inSyncReplicaIds = Set.empty
       leaderReplicaIdOpt = None
       leaderEpochStartOffsetOpt = None
       Partition.removeMetrics(topicPartition)
@@ -512,16 +484,11 @@ class Partition(val topicPartition: TopicPartition,
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
-      // add replicas that are new
-      val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(_.toInt)
-      newInSyncReplicas.filter(_ != localBrokerId).foreach(getOrCreateReplica)
-      inSyncReplicas = newInSyncReplicas.toSet
-
-      // remove assigned replicas that have been removed by the controller
-      val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
-      (remoteReplicasMap.keys -- newAssignedReplicas).foreach(removeReplica)
-      newAssignedReplicas.filter(_ != localBrokerId).foreach(getOrCreateReplica)
 
+      updateAssignmentAndIsr(
+        assignment = partitionStateInfo.basePartitionState.replicas.asScala.iterator.map(_.toInt).toSeq,
+        isr = partitionStateInfo.basePartitionState.isr.asScala.iterator.map(_.toInt).toSet
+      )
       createLogIfNotExists(localBrokerId, partitionStateInfo.isNew, isFutureReplica = false, highWatermarkCheckpoints)
 
       val leaderLog = localLogOrException
@@ -546,7 +513,7 @@ class Partition(val topicPartition: TopicPartition,
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
       remoteReplicas.foreach { replica =>
-        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica.brokerId)) curTimeMs else 0L
+        val lastCaughtUpTimeMs = if (inSyncReplicaIds.contains(replica.brokerId)) curTimeMs else 0L
         replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
       }
 
@@ -584,19 +551,18 @@ class Partition(val topicPartition: TopicPartition,
                    correlationId: Int,
                    highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
       val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
       val oldLeaderEpoch = leaderEpoch
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
-      // add replicas that are new
-      newAssignedReplicas.filter(_ != localBrokerId).foreach(id => getOrCreateReplica(id))
+
+      updateAssignmentAndIsr(
+        assignment = partitionStateInfo.basePartitionState.replicas.asScala.iterator.map(_.toInt).toSeq,
+        isr = Set.empty[Int]
+      )
       createLogIfNotExists(localBrokerId, partitionStateInfo.isNew, isFutureReplica = false, highWatermarkCheckpoints)
 
-      // remove assigned replicas that have been removed by the controller
-      (remoteReplicasMap.keys -- newAssignedReplicas).foreach(removeReplica)
-      inSyncReplicas = Set.empty[Int]
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
       leaderEpochStartOffsetOpt = None
       zkVersion = partitionStateInfo.basePartitionState.zkVersion
@@ -654,6 +620,30 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
+   * Stores the topic partition assignment and ISR.
+   * It creates a new Replica object for any new remote broker. The isr parameter is
+   * expected to be a subset of the assignment parameter.
+   *
+   * Note: public visibility for tests.
+   *
+   * @param assignment An ordered sequence of all the broker ids that were assigned to this
+   *                   topic partition
+   * @param isr The set of broker ids that are known to be insync with the leader
+   */
+  def updateAssignmentAndIsr(assignment: Seq[Int], isr: Set[Int]): Unit = {
+    val replicaSet = assignment.toSet
+    val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
+    assignment
+      .filter(_ != localBrokerId)
+      .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
+    removedReplicas.foreach(remoteReplicasMap.remove)
+    allReplicaIds = assignment
+
+    inSyncReplicaIds = isr
+  }
+
+  /**
    * Check and maybe expand the ISR of the partition.
    * A replica will be added to ISR if its LEO >= current hw of the partition and it is caught up to
    * an offset within the current leader epoch. A replica must be caught up to the current leader
@@ -675,13 +665,13 @@ class Partition(val topicPartition: TopicPartition,
       leaderLogIfLocal match {
         case Some(leaderLog) =>
           val leaderHighwatermark = leaderLog.highWatermark
-          if (!inSyncReplicas.contains(followerReplica.brokerId) && isFollowerInSync(followerReplica, leaderHighwatermark)) {
-            val newInSyncReplicas = inSyncReplicas + followerReplica.brokerId
-            info(s"Expanding ISR from ${inSyncReplicas.mkString(",")} " +
-              s"to ${newInSyncReplicas.mkString(",")}")
+          if (!inSyncReplicaIds.contains(followerReplica.brokerId) && isFollowerInSync(followerReplica, leaderHighwatermark)) {
+            val newInSyncReplicaIds = inSyncReplicaIds + followerReplica.brokerId
+            info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} " +
+              s"to ${newInSyncReplicaIds.mkString(",")}")
 
             // update ISR in ZK and cache
-            expandIsr(newInSyncReplicas)
+            expandIsr(newInSyncReplicaIds)
           }
           // check if the HW of the partition can now be incremented
           // since the replica may already be in the ISR and its LEO has just incremented
@@ -708,14 +698,14 @@ class Partition(val topicPartition: TopicPartition,
     leaderLogIfLocal match {
       case Some(leaderLog) =>
         // keep the current immutable replica list reference
-        val curInSyncReplicas = inSyncReplicas
+        val curInSyncReplicaIds = inSyncReplicaIds
 
         if (isTraceEnabled) {
           def logEndOffsetString: ((Int, Long)) => String = {
             case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
           }
 
-          val curInSyncReplicaObjects = (curInSyncReplicas - localBrokerId).map(getReplicaOrException)
+          val curInSyncReplicaObjects = (curInSyncReplicaIds - localBrokerId).map(getReplicaOrException)
           val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.logEndOffset))
           val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
           val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition { _._2 >= requiredOffset}
@@ -731,7 +721,7 @@ class Partition(val topicPartition: TopicPartition,
            * The topic may be configured not to accept messages if there are not enough replicas in ISR
            * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
            */
-          if (minIsr <= curInSyncReplicas.size)
+          if (minIsr <= curInSyncReplicaIds.size)
             (true, Errors.NONE)
           else
             (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
@@ -762,7 +752,7 @@ class Partition(val topicPartition: TopicPartition,
    */
   private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {
     val replicaLogEndOffsets = remoteReplicas.filter { replica =>
-      curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica.brokerId)
+      curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicaIds.contains(replica.brokerId)
     }.map(_.logEndOffsetMetadata)
     val newHighWatermark = (replicaLogEndOffsets + leaderLog.logEndOffsetMetadata).min(new LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderLog.highWatermarkMetadata
@@ -796,7 +786,7 @@ class Partition(val topicPartition: TopicPartition,
     if (!isLeader)
       throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
     val logStartOffsets = remoteReplicas.collect {
-      case replica if metadataCache.isBrokerAlive(replica.brokerId) => replica.logStartOffset
+      case replica if metadataCache.getAliveBroker(replica.brokerId).nonEmpty => replica.logStartOffset
     } + localLogOrException.logStartOffset
 
     futureLog match {
@@ -816,23 +806,23 @@ class Partition(val topicPartition: TopicPartition,
     val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
       leaderLogIfLocal match {
         case Some(leaderLog) =>
-          val outOfSyncReplicas = getOutOfSyncReplicas(replicaMaxLagTimeMs)
-          if (outOfSyncReplicas.nonEmpty) {
-            val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
-            assert(newInSyncReplicas.nonEmpty)
+          val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaMaxLagTimeMs)
+          if (outOfSyncReplicaIds.nonEmpty) {
+            val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds
+            assert(newInSyncReplicaIds.nonEmpty)
             info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s."
-              .format(inSyncReplicas.mkString(","),
-                newInSyncReplicas.mkString(","),
+              .format(inSyncReplicaIds.mkString(","),
+                newInSyncReplicaIds.mkString(","),
                 leaderLog.highWatermark,
                 leaderLog.logEndOffset,
-                outOfSyncReplicas.map { replicaId =>
+                outOfSyncReplicaIds.map { replicaId =>
                   s"(brokerId: $replicaId, endOffset: ${getReplicaOrException(replicaId).logEndOffset})"
                 }.mkString(" ")
               )
             )
 
             // update ISR in zk and in cache
-            shrinkIsr(newInSyncReplicas)
+            shrinkIsr(newInSyncReplicaIds)
 
             // we may need to increment high watermark since ISR could be down to 1
             maybeIncrementLeaderHW(leaderLog)
@@ -871,10 +861,10 @@ class Partition(val topicPartition: TopicPartition,
      * is violated, that replica is considered to be out of sync
      *
      **/
-    val candidateReplicas = inSyncReplicas - localBrokerId
+    val candidateReplicaIds = inSyncReplicaIds - localBrokerId
     val currentTimeMs = time.milliseconds()
     val leaderEndOffset = localLogOrException.logEndOffset
-    candidateReplicas.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))
+    candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))
   }
 
   private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Option[LogAppendInfo] = {
@@ -924,11 +914,11 @@ class Partition(val topicPartition: TopicPartition,
       leaderLogIfLocal match {
         case Some(leaderLog) =>
           val minIsr = leaderLog.config.minInSyncReplicas
-          val inSyncSize = inSyncReplicas.size
+          val inSyncSize = inSyncReplicaIds.size
 
           // Avoid writing to leader if there are not enough insync replicas to make it safe
           if (inSyncSize < minIsr && requiredAcks == -1) {
-            throw new NotEnoughReplicasException(s"The size of the current ISR $inSyncReplicas " +
+            throw new NotEnoughReplicasException(s"The size of the current ISR $inSyncReplicaIds " +
               s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
           }
 
@@ -1179,7 +1169,7 @@ class Partition(val topicPartition: TopicPartition,
   private def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: Option[Int]): Unit = {
     zkVersionOpt match {
       case Some(newVersion) =>
-        inSyncReplicas = isr
+        inSyncReplicaIds = isr
         zkVersion = newVersion
         info("ISR updated to [%s] and zkVersion updated to [%d]".format(isr.mkString(","), zkVersion))
 
@@ -1201,8 +1191,8 @@ class Partition(val topicPartition: TopicPartition,
     partitionString.append("Topic: " + topic)
     partitionString.append("; Partition: " + partitionId)
     partitionString.append("; Leader: " + leaderReplicaIdOpt)
-    partitionString.append("; AllReplicas: " + allReplicaIds.mkString(","))
-    partitionString.append("; InSyncReplicas: " + inSyncReplicas.mkString(","))
+    partitionString.append("; AllReplicaIds: " + allReplicaIds.mkString(","))
+    partitionString.append("; InSyncReplicaIds: " + inSyncReplicaIds.mkString(","))
     partitionString.toString
   }
 }
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 647ccb1..2ec84f2 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -158,8 +158,8 @@ class MetadataCache(brokerId: Int) extends Logging {
     topics -- metadataSnapshot.partitionStates.keySet
   }
 
-  def isBrokerAlive(brokerId: Int): Boolean = {
-    metadataSnapshot.aliveBrokers.contains(brokerId)
+  def getAliveBroker(brokerId: Int): Option[Broker] = {
+    metadataSnapshot.aliveBrokers.get(brokerId)
   }
 
   def getAliveBrokers: Seq[Broker] = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 71f38c2..07b0928 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,7 +28,7 @@ import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
+import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
@@ -816,7 +816,7 @@ class ReplicaManager(val config: KafkaConfig,
                     fetchMaxBytes: Int,
                     hardMaxBytesLimit: Boolean,
                     fetchInfos: Seq[(TopicPartition, PartitionData)],
-                    quota: ReplicaQuota = UnboundedQuota,
+                    quota: ReplicaQuota,
                     responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
                     isolationLevel: IsolationLevel) {
     val isFromFollower = Request.isValidBrokerId(replicaId)
@@ -1005,7 +1005,7 @@ class ReplicaManager(val config: KafkaConfig,
    *  the quota is exceeded and the replica is not in sync.
    */
   def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = {
-    val isReplicaInSync = nonOfflinePartition(topicPartition).exists(_.inSyncReplicas.contains(replicaId))
+    val isReplicaInSync = nonOfflinePartition(topicPartition).exists(_.inSyncReplicaIds.contains(replicaId))
     !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 9989fc0..4f2d319 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -468,7 +468,7 @@ class PartitionTest {
     val follower1 = brokerId + 1
     val follower2 = brokerId + 2
     val controllerId = brokerId + 3
-    val replicas = List[Integer](leader, follower1, follower2).asJava
+    val replicas = List(leader, follower1, follower2)
     val isr = List[Integer](leader, follower2).asJava
     val leaderEpoch = 8
     val batch1 = TestUtils.records(records = List(
@@ -478,12 +478,14 @@ class PartitionTest {
       new SimpleRecord(20,"k4".getBytes, "v2".getBytes),
       new SimpleRecord(21,"k5".getBytes, "v3".getBytes)))
 
-    val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true)
+    val leaderState = new LeaderAndIsrRequest.PartitionState(
+      controllerEpoch, leader, leaderEpoch, isr, 1, replicas.map(Int.box).asJava, true
+    )
 
     assertTrue("Expected first makeLeader() to return 'leader changed'",
       partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
-    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas)
+    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
 
     // after makeLeader(() call, partition should know about all the replicas
     // append records with initial leader epoch
@@ -546,13 +548,15 @@ class PartitionTest {
     assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED)))
 
     // Make into a follower
-    val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2,
-      leaderEpoch + 1, isr, 4, replicas, false)
+    val followerState = new LeaderAndIsrRequest.PartitionState(
+      controllerEpoch, follower2, leaderEpoch + 1, isr, 4, replicas.map(Int.box).asJava, false
+    )
     assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints))
 
     // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition
-    val newLeaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 5,
-      replicas, false)
+    val newLeaderState = new LeaderAndIsrRequest.PartitionState(
+      controllerEpoch, leader, leaderEpoch + 2, isr, 5, replicas.map(Int.box).asJava, false
+    )
     assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints))
 
     // Try to get offsets as a client
@@ -752,15 +756,6 @@ class PartitionTest {
     assertThrows[ReplicaNotAvailableException] {
       partition.localLogOrException
     }
-
-    assertThrows[IllegalArgumentException] {
-      partition.getOrCreateReplica(brokerId)
-    }
-
-    val remoteReplicaId = brokerId + 1;
-    val replica = partition.getOrCreateReplica(remoteReplicaId)
-    assertEquals(replica.brokerId, remoteReplicaId)
-    assertEquals(replica.topicPartition, partition.topicPartition)
   }
 
   @Test
@@ -775,17 +770,17 @@ class PartitionTest {
   def testMakeFollowerWithNoLeaderIdChange(): Unit = {
     // Start off as follower
     var partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 1,
-      List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+      List[Integer](0, 1, 2, brokerId).asJava, 1, List[Integer](0, 1, 2, brokerId).asJava, false)
     partition.makeFollower(0, partitionStateInfo, 0, offsetCheckpoints)
 
     // Request with same leader and epoch increases by only 1, do become-follower steps
     partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
-      List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+      List[Integer](0, 1, 2, brokerId).asJava, 1, List[Integer](0, 1, 2, brokerId).asJava, false)
     assertTrue(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints))
 
     // Request with same leader and same epoch, skip become-follower steps
     partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
-      List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+      List[Integer](0, 1, 2, brokerId).asJava, 1, List[Integer](0, 1, 2, brokerId).asJava, false)
     assertFalse(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints))
   }
 
@@ -811,7 +806,7 @@ class PartitionTest {
     assertTrue("Expected first makeLeader() to return 'leader changed'",
                partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
-    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas)
+    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
 
     // after makeLeader(() call, partition should know about all the replicas
     // append records with initial leader epoch
@@ -851,14 +846,14 @@ class PartitionTest {
     // fetch from follower not in ISR from log start offset should not add this follower to ISR
     updateFollowerFetchState(follower1, LogOffsetMetadata(0))
     updateFollowerFetchState(follower1, LogOffsetMetadata(lastOffsetOfFirstBatch))
-    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas)
+    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
 
     // fetch from the follower not in ISR from start offset of the current leader epoch should
     // add this follower to ISR
     when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2,
       List(leader, follower2, follower1), 1))).thenReturn(Some(2))
     updateFollowerFetchState(follower1, LogOffsetMetadata(currentLeaderEpochStartOffset))
-    assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas)
+    assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicaIds)
   }
 
   /**
@@ -1055,16 +1050,23 @@ class PartitionTest {
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
-    val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+    val replicas = List(brokerId, remoteBrokerId)
     val isr = List[Integer](brokerId).asJava
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
-    assertTrue("Expected become leader transition to succeed",
-      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
-        leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId), partition.inSyncReplicas)
+    assertTrue(
+      "Expected become leader transition to succeed",
+      partition.makeLeader(
+        controllerId,
+        new LeaderAndIsrRequest.PartitionState(
+          controllerEpoch, brokerId, leaderEpoch, isr, 1, replicas.map(Int.box).asJava, true),
+        0,
+        offsetCheckpoints
+      )
+    )
+    assertEquals(Set(brokerId), partition.inSyncReplicaIds)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
@@ -1076,7 +1078,7 @@ class PartitionTest {
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 6L)
 
-    assertEquals(Set(brokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId), partition.inSyncReplicaIds)
     assertEquals(3L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
 
@@ -1094,7 +1096,7 @@ class PartitionTest {
       followerFetchTimeMs = time.milliseconds(),
       leaderEndOffset = 6L)
 
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
   }
@@ -1117,7 +1119,7 @@ class PartitionTest {
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId), partition.inSyncReplicaIds)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
@@ -1138,7 +1140,7 @@ class PartitionTest {
       leaderEndOffset = 10L)
 
     // Follower state is updated, but the ISR has not expanded
-    assertEquals(Set(brokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId), partition.inSyncReplicaIds)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
   }
@@ -1152,17 +1154,24 @@ class PartitionTest {
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
-    val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+    val replicas = List(brokerId, remoteBrokerId)
     val isr = List[Integer](brokerId, remoteBrokerId).asJava
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
     partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
-    assertTrue("Expected become leader transition to succeed",
-      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
-        leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertTrue(
+      "Expected become leader transition to succeed",
+      partition.makeLeader(
+        controllerId,
+        new LeaderAndIsrRequest.PartitionState(
+          controllerEpoch, brokerId, leaderEpoch, isr, 1, replicas.map(Int.box).asJava, true),
+        0,
+        offsetCheckpoints
+      )
+    )
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
@@ -1172,7 +1181,7 @@ class PartitionTest {
 
     // On initialization, the replica is considered caught up and should not be removed
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
 
     // If enough time passes without a fetch update, the ISR should shrink
     time.sleep(10001)
@@ -1184,7 +1193,7 @@ class PartitionTest {
     when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(Some(2))
 
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId), partition.inSyncReplicaIds)
     assertEquals(10L, partition.localLogOrException.highWatermark)
   }
 
@@ -1197,17 +1206,24 @@ class PartitionTest {
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
-    val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+    val replicas = List(brokerId, remoteBrokerId)
     val isr = List[Integer](brokerId, remoteBrokerId).asJava
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
     partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
-    assertTrue("Expected become leader transition to succeed",
-      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
-        leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertTrue(
+      "Expected become leader transition to succeed",
+      partition.makeLeader(
+        controllerId,
+        new LeaderAndIsrRequest.PartitionState(
+          controllerEpoch, brokerId, leaderEpoch, isr, 1, replicas.map(Int.box).asJava, true),
+        0,
+        offsetCheckpoints
+      )
+    )
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
@@ -1245,7 +1261,7 @@ class PartitionTest {
     // The ISR should not be shrunk because the follower has caught up with the leader at the
     // time of the first fetch.
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
   }
 
   @Test
@@ -1257,17 +1273,24 @@ class PartitionTest {
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
-    val replicas = List[Integer](brokerId, remoteBrokerId).asJava
+    val replicas = List(brokerId, remoteBrokerId)
     val isr = List[Integer](brokerId, remoteBrokerId).asJava
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
     partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
-    assertTrue("Expected become leader transition to succeed",
-      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
-        leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertTrue(
+      "Expected become leader transition to succeed",
+      partition.makeLeader(
+        controllerId,
+        new LeaderAndIsrRequest.PartitionState(
+          controllerEpoch, brokerId, leaderEpoch, isr, 1, replicas.map(Int.box).asJava, true),
+        0,
+        offsetCheckpoints
+      )
+    )
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
@@ -1291,7 +1314,7 @@ class PartitionTest {
 
     // The ISR should not be shrunk because the follower is caught up to the leader's log end
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
   }
 
   @Test
@@ -1313,7 +1336,7 @@ class PartitionTest {
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
         leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints))
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(0L, partition.localLogOrException.highWatermark)
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
@@ -1332,7 +1355,7 @@ class PartitionTest {
     when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(None)
 
     partition.maybeShrinkIsr(10000)
-    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicas)
+    assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(0L, partition.localLogOrException.highWatermark)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 92820d6..3eeb064 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Utils
 import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
-import kafka.cluster.Replica
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import java.util.concurrent.atomic.AtomicBoolean
@@ -76,8 +75,12 @@ class HighwatermarkPersistenceTest {
       // create leader and follower replicas
       val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig())
       partition0.setLog(log0, isFutureLog = false)
-      val followerReplicaPartition0 = new Replica(configs.last.brokerId, tp0)
-      partition0.addReplicaIfNotExists(followerReplicaPartition0)
+
+      partition0.updateAssignmentAndIsr(
+        assignment = Seq(configs.head.brokerId, configs.last.brokerId),
+        isr = Set(configs.head.brokerId)
+      )
+
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(log0.highWatermark, fooPartition0Hw)
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
similarity index 93%
rename from core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
rename to core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 2fe67f7..c258996 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -20,7 +20,7 @@ import java.io.File
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.cluster.{Partition, Replica}
+import kafka.cluster.Partition
 import kafka.log.{Log, LogManager}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -77,7 +77,7 @@ class IsrExpirationTest {
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicaIds)
 
     // let the follower catch up to the Leader logEndOffset - 1
     for (replica <- partition0.remoteReplicas)
@@ -107,7 +107,7 @@ class IsrExpirationTest {
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicaIds)
 
     // Let enough time pass for the replica to be considered stuck
     time.sleep(150)
@@ -127,7 +127,7 @@ class IsrExpirationTest {
     val log = logMock
     // add one partition
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicaIds)
     // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
     for (replica <- partition0.remoteReplicas)
       replica.updateFetchState(
@@ -182,7 +182,7 @@ class IsrExpirationTest {
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
-    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicaIds)
 
     // let the follower catch up to the Leader logEndOffset
     for (replica <- partition0.remoteReplicas)
@@ -211,10 +211,11 @@ class IsrExpirationTest {
     val partition = replicaManager.createPartition(tp)
     partition.setLog(localLog, isFutureLog = false)
 
-    val allReplicas = getFollowerReplicas(partition, leaderId, time)
-    allReplicas.foreach(r => partition.addReplicaIfNotExists(r))
-    // set in sync replicas for this partition to all the assigned replicas
-    partition.inSyncReplicas = allReplicas.map(_.brokerId).toSet + leaderId
+    partition.updateAssignmentAndIsr(
+      assignment = configs.map(_.brokerId),
+      isr = configs.map(_.brokerId).toSet
+    )
+
     // set lastCaughtUpTime to current time
     for (replica <- partition.remoteReplicas)
       replica.updateFetchState(
@@ -236,10 +237,4 @@ class IsrExpirationTest {
     EasyMock.replay(log)
     log
   }
-
-  private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
-    configs.filter(_.brokerId != leaderId).map { config =>
-      new Replica(config.brokerId, partition.topicPartition)
-    }
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 03ce3a5..90d5f78 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -121,7 +121,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     producer.send(record).get
 
     assertEquals(brokerCount, leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader))
-      .get.inSyncReplicas.size)
+      .get.inSyncReplicaIds.size)
     followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread =>
       assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete)
     }
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index e36b371..cd76bca 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -149,7 +149,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
       * is that server1 has caught up on the topicPartition, and has joined the ISR.
       * In the line below, we wait until the condition is met before shutting down server2
       */
-    waitUntilTrue(() => server2.replicaManager.nonOfflinePartition(topicPartition).get.inSyncReplicas.size == 2,
+    waitUntilTrue(() => server2.replicaManager.nonOfflinePartition(topicPartition).get.inSyncReplicaIds.size == 2,
       "Server 1 is not able to join the ISR after restart")
 
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9e39b5f..48c23e4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -20,7 +20,7 @@ import java.io.File
 import java.util.{Optional, Properties}
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.cluster.{Partition, Replica}
+import kafka.cluster.Partition
 import kafka.log.{Log, LogManager, LogOffsetSnapshot}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
@@ -246,14 +246,10 @@ class ReplicaManagerQuotasTest {
       partition.leaderReplicaIdOpt = Some(leaderBrokerId)
       partition.setLog(log, isFutureLog = false)
 
-      val followerReplica = new Replica(configs.last.brokerId, p)
-      val allReplicas : Set[Int] = Set(leaderBrokerId, followerReplica.brokerId)
-      partition.addReplicaIfNotExists(followerReplica)
-      if (bothReplicasInSync) {
-        partition.inSyncReplicas = allReplicas
-      } else {
-        partition.inSyncReplicas = Set(leaderBrokerId)
-      }
+      partition.updateAssignmentAndIsr(
+        assignment = Seq(leaderBrokerId, configs.last.brokerId),
+        isr = if (bothReplicasInSync) Set(leaderBrokerId, configs.last.brokerId) else Set(leaderBrokerId)
+      )
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0ed7ff6..87f2894 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -26,6 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.cluster.BrokerEndPoint
+import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.LazyOffsetCheckpoints
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.timer.MockTimer
@@ -512,6 +513,7 @@ class ReplicaManagerTest {
         fetchMaxBytes = maxFetchBytes,
         hardMaxBytesLimit = false,
         fetchInfos = Seq(tp -> validFetchPartitionData),
+        quota = UnboundedQuota,
         isolationLevel = IsolationLevel.READ_UNCOMMITTED,
         responseCallback = callback
       )
@@ -533,6 +535,7 @@ class ReplicaManagerTest {
         fetchMaxBytes = maxFetchBytes,
         hardMaxBytesLimit = false,
         fetchInfos = Seq(tp -> invalidFetchPartitionData),
+        quota = UnboundedQuota,
         isolationLevel = IsolationLevel.READ_UNCOMMITTED,
         responseCallback = callback
       )
@@ -612,6 +615,7 @@ class ReplicaManagerTest {
         fetchInfos = Seq(
           tp0 -> new PartitionData(1, 0, 100000, Optional.empty()),
           tp1 -> new PartitionData(1, 0, 100000, Optional.empty())),
+        quota = UnboundedQuota,
         responseCallback = fetchCallback,
         isolationLevel = IsolationLevel.READ_UNCOMMITTED
       )
@@ -741,7 +745,9 @@ class ReplicaManagerTest {
     val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes
     aliveBrokerIds.foreach { brokerId =>
-      EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes
+      EasyMock.expect(metadataCache.getAliveBroker(EasyMock.eq(brokerId)))
+        .andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId)))
+        .anyTimes
     }
     EasyMock.replay(metadataCache)
 
@@ -887,6 +893,7 @@ class ReplicaManagerTest {
       fetchMaxBytes = Int.MaxValue,
       hardMaxBytesLimit = false,
       fetchInfos = Seq(partition -> partitionData),
+      quota = UnboundedQuota,
       responseCallback = fetchCallback,
       isolationLevel = isolationLevel)
 
@@ -903,7 +910,9 @@ class ReplicaManagerTest {
     val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     aliveBrokerIds.foreach { brokerId =>
-      EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes()
+      EasyMock.expect(metadataCache.getAliveBroker(EasyMock.eq(brokerId)))
+        .andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId)))
+        .anyTimes()
     }
     EasyMock.replay(metadataCache)
 
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1f3179c..bae1171 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -20,8 +20,8 @@ import java.io.File
 
 import kafka.api._
 import kafka.utils._
-import kafka.cluster.Replica
-import kafka.log.{Log, LogManager}
+import kafka.log.Log
+import kafka.log.LogManager
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.metrics.Metrics
@@ -83,6 +83,7 @@ class SimpleFetchTest {
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLEO)).anyTimes()
+    EasyMock.expect(log.highWatermarkMetadata).andReturn(LogOffsetMetadata(partitionHW)).anyTimes()
     EasyMock.expect(log.highWatermark).andReturn(partitionHW).anyTimes()
     EasyMock.expect(log.lastStableOffset).andReturn(partitionHW).anyTimes()
     EasyMock.expect(log.read(
@@ -127,18 +128,19 @@ class SimpleFetchTest {
     partition.setLog(log, false)
 
     // create the follower replica with defined log end offset
-    val followerReplica= new Replica(configs(1).brokerId, partition.topicPartition)
+    val followerId = configs(1).brokerId
+    val allReplicas = Seq(configs.head.brokerId, followerId)
+    partition.updateAssignmentAndIsr(
+      assignment = allReplicas,
+      isr = allReplicas.toSet
+    )
     val leo = LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    followerReplica.updateFetchState(
+    partition.updateFollowerFetchState(
+      followerId,
       followerFetchOffsetMetadata = leo,
       followerStartOffset = 0L,
       followerFetchTimeMs= time.milliseconds,
       leaderEndOffset = leo.messageOffset)
-    partition.addReplicaIfNotExists(followerReplica)
-
-    // add both of them to ISR
-    val allReplicas = List(configs.head.brokerId, followerReplica.brokerId)
-    partition.inSyncReplicas = allReplicas.toSet
   }
 
   @After
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 3e48109..01ba69f 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -444,7 +444,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   private def awaitISR(tp: TopicPartition): Unit = {
     TestUtils.waitUntilTrue(() => {
-      leader.replicaManager.nonOfflinePartition(tp).get.inSyncReplicas.size == 2
+      leader.replicaManager.nonOfflinePartition(tp).get.inSyncReplicaIds.size == 2
     }, "Timed out waiting for replicas to join ISR")
   }
 


Mime
View raw message