kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions; reviewed by Jun Rao and Guozhang Wang
Date Sat, 07 Dec 2013 00:27:00 GMT
Updated Branches:
  refs/heads/trunk 876cfdb59 -> 32aae7202


KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions;
reviewed by Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32aae720
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32aae720
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32aae720

Branch: refs/heads/trunk
Commit: 32aae7202ce041128fb1f6d2ea43580ee7864d74
Parents: 876cfdb
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Fri Dec 6 16:26:53 2013 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Dec 6 16:26:53 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  10 +-
 .../controller/ControllerChannelManager.scala   |  10 +-
 .../kafka/controller/KafkaController.scala      |   2 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  29 ++----
 .../main/scala/kafka/server/KafkaConfig.scala   |   5 +-
 .../scala/kafka/server/ReplicaManager.scala     | 102 +++++++++++--------
 .../test/scala/unit/kafka/admin/AdminTest.scala |  27 +++--
 .../unit/kafka/server/ISRExpirationTest.scala   |   1 +
 .../unit/kafka/server/LogRecoveryTest.scala     |   1 +
 .../unit/kafka/server/SimpleFetchTest.scala     |   1 +
 10 files changed, 106 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 02ccc17..13f48ba 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -310,19 +310,15 @@ class Partition(val topic: String,
   def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages:
Long): Set[Replica] = {
     /**
      * there are two cases that need to be handled here -
-     * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the
leo hasn't been updated
-     *                     for keepInSyncTimeMs ms, the follower is stuck and should be removed
from the ISR
+     * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs
ms,
+     *                     the follower is stuck and should be removed from the ISR
      * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader
by keepInSyncMessages, the
      *                     follower is not catching up and should be removed from the ISR
      **/
     val leaderLogEndOffset = leaderReplica.logEndOffset
     val candidateReplicas = inSyncReplicas - leaderReplica
     // Case 1 above
-    val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < leaderLogEndOffset)
-    if(possiblyStuckReplicas.size > 0)
-      debug("Possibly stuck replicas for partition [%s,%d] are %s".format(topic, partitionId,
-        possiblyStuckReplicas.map(_.brokerId).mkString(",")))
-    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs <
(time.milliseconds - keepInSyncTimeMs))
+    val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs)
> keepInSyncTimeMs)
     if(stuckReplicas.size > 0)
       debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     // Case 2 above

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index beca460..4c121e4 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -238,8 +238,9 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext,
sendReq
       val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId,
controllerEpoch, correlationId, clientId)
       for (p <- partitionStateInfos) {
         val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader)
"become-leader" else "become-follower"
-        stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request
with correlationId %d to broker %d " +
-                                 "for partition [%s,%d]").format(controllerId, controllerEpoch,
typeOfRequest, correlationId, broker,
+        stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request
%s with correlationId %d to broker %d " +
+                                 "for partition [%s,%d]").format(controllerId, controllerEpoch,
typeOfRequest,
+                                                                 p._2.leaderIsrAndControllerEpoch,
correlationId, broker,
                                                                  p._1._1, p._1._2))
       }
       sendRequest(broker, leaderAndIsrRequest, null)
@@ -250,8 +251,9 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext,
sendReq
       val partitionStateInfos = m._2.toMap
       val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch,
correlationId, clientId,
                                                             partitionStateInfos, controllerContext.liveOrShuttingDownBrokers)
-      partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d
sending UpdateMetadata request with " +
-        "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch,
correlationId, broker, p._1)))
+      partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d
sending UpdateMetadata request %s with " +
+        "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch,
p._2.leaderIsrAndControllerEpoch,
+                                                                 correlationId, broker, p._1)))
       sendRequest(broker, updateMetadataRequest, null)
     }
     updateMetadataRequestMap.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3beaf75..fd92c65 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -812,7 +812,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
               info("New leader and ISR for partition %s is %s".format(topicAndPartition,
newLeaderAndIsr.toString()))
             updateSucceeded
           } else {
-            warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
+            warn("Cannot remove replica %d from ISR of partition %s since it is not in the
ISR. Leader = %d ; ISR = %s"
                  .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr,
epoch))
             controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c52225a..ad4ee53 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -171,24 +171,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
           val leaderAndIsrIsEmpty: Boolean =
             controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
               case Some(currLeaderIsrAndControllerEpoch) =>
-                if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
-                  controller.removeReplicaFromIsr(topic, partition, replicaId) match {
-                    case Some(updatedLeaderIsrAndControllerEpoch) =>
-                      // send the shrunk ISR state change request only to the leader
-                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
-                        topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
-                      replicaState.put((topic, partition, replicaId), OfflineReplica)
-                      stateChangeLogger.trace("Controller %d epoch %d changed state of replica
%d for partition %s to OfflineReplica"
-                                                .format(controllerId, controller.epoch, replicaId,
topicAndPartition))
-                      false
-                    case None =>
-                      true
-                  }
-                else {
-                  replicaState.put((topic, partition, replicaId), OfflineReplica)
-                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica
%d for partition %s to OfflineReplica"
-                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
-                  false
+                controller.removeReplicaFromIsr(topic, partition, replicaId) match {
+                  case Some(updatedLeaderIsrAndControllerEpoch) =>
+                    // send the shrunk ISR state change request only to the leader
+                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
+                      topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
+                    replicaState.put((topic, partition, replicaId), OfflineReplica)
+                    stateChangeLogger.trace("Controller %d epoch %d changed state of replica
%d for partition %s to OfflineReplica"
+                      .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+                    false
+                  case None =>
+                    true
                 }
               case None =>
                 true

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8f9db10..a7e5b73 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -198,8 +198,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the number of byes of messages to attempt to fetch */
   val replicaFetchMaxBytes = props.getIntInRange("replica.fetch.max.bytes", ConsumerConfig.FetchSize,
(messageMaxBytes, Int.MaxValue))
 
-  /* max wait time for each fetcher request issued by follower replicas*/
+  /* max wait time for each fetcher request issued by follower replicas. This value should
always be less than the
+  *  replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput
topics */
   val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500)
+  require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should
always be at least replica.lag.time.max.ms" +
+                                                        " to prevent frequent changes in
ISR")
 
   /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs
*/
   val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f9c7c29..b0a3962 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -204,17 +204,19 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String,
Int), Short], Short) = {
-    leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo)
=>
-      stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d
received from controller %d epoch %d for partition [%s,%d]"
-                                .format(localBrokerId, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId,
-                                        leaderAndISRRequest.controllerEpoch, topic, partition))}
-    info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest))
-
+    leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo)
=>
+      stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id
%d from controller %d epoch %d for partition [%s,%d]"
+                                .format(localBrokerId, stateInfo.leaderIsrAndControllerEpoch,
leaderAndISRRequest.correlationId,
+                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch,
topic, partition))
+    }
     replicaStateChangeLock synchronized {
       val responseMap = new collection.mutable.HashMap[(String, Int), Short]
       if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d
with an old controller epoch %d. Latest known controller epoch is %d"
-          .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId,
controllerEpoch))
+        leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo)
=>
+        stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d
with an old controller epoch %d." +
+          " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId,
+                                                         leaderAndISRRequest.controllerEpoch,
controllerEpoch))
+        }
         (responseMap, ErrorMapping.StaleControllerEpochCode)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
@@ -234,7 +236,7 @@ class ReplicaManager(val config: KafkaConfig,
             // Otherwise record the error code in response
             stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with
correlation id %d from " +
               "controller %d epoch %d with an older leader epoch %d for partition [%s,%d],
current leader epoch is %d")
-              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch,
+              .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
               partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic,
partition.partitionId, partitionLeaderEpoch))
             responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
           }
@@ -247,7 +249,6 @@ class ReplicaManager(val config: KafkaConfig,
         if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader,
leaderAndISRRequest.correlationId, responseMap)
         if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch,
partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
 
-        info("Handled leader and isr request %s".format(leaderAndISRRequest))
         // we initialize highwatermark thread after the first leaderisrrequest. This ensures
that all the partitions
         // have been completely populated before starting the checkpointing there by avoiding
weird race conditions
         if (!hwThreadInitialized) {
@@ -274,10 +275,10 @@ class ReplicaManager(val config: KafkaConfig,
   private def makeLeaders(controllerId: Int, epoch: Int,
                           partitionState: Map[Partition, PartitionStateInfo],
                           correlationId: Int, responseMap: mutable.Map[(String, Int), Short])
= {
-    stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
-      "starting the become-leader transition for partitions %s")
-      .format(localBrokerId, correlationId, controllerId, epoch,
-      partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
+    partitionState.foreach(state =>
+      stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d
from controller %d epoch %d " +
+        "starting the become-leader transition for partition %s")
+        .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic,
state._1.partitionId))))
 
     for (partition <- partitionState.keys)
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
@@ -285,9 +286,11 @@ class ReplicaManager(val config: KafkaConfig,
     try {
       // First stop fetchers for all the partitions
       replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
-      stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower
request from controller %d epoch %d"
-        .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic,
p.partitionId)).mkString(","), controllerId, correlationId))
-
+      partitionState.foreach { state =>
+        stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request
from controller " +
+          "%d epoch %d with correlation id %d for partition %s")
+          .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic,
state._1.partitionId)))
+      }
       // Update the partition information to be the leader
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
@@ -298,17 +301,21 @@ class ReplicaManager(val config: KafkaConfig,
       }
     } catch {
       case e: Throwable =>
-        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
-          "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
-        stateChangeLogger.error(errorMsg, e)
+        partitionState.foreach { state =>
+          val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d" +
+            "epoch %d for partition %s").format(localBrokerId, correlationId, controllerId,
epoch,
+                                                TopicAndPartition(state._1.topic, state._1.partitionId))
+          stateChangeLogger.error(errorMsg, e)
+        }
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
-    stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
-      "for the become-leader transition for partitions %s")
-      .format(localBrokerId, correlationId, controllerId, epoch,
-      partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
+    partitionState.foreach { state =>
+      stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d
from controller %d epoch %d " +
+        "for the become-leader transition for partition %s")
+        .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic,
state._1.partitionId)))
+    }
   }
 
   /*
@@ -329,10 +336,10 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition,
PartitionStateInfo],
                             leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String,
Int), Short]) {
-    stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
-      "starting the become-follower transition for partitions %s")
-      .format(localBrokerId, correlationId, controllerId, epoch,
-      partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
+    partitionState.foreach(state =>
+      stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d
from controller %d epoch %d " +
+        "starting the become-follower transition for partition %s")
+        .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic,
state._1.partitionId))))
 
     for (partition <- partitionState.keys)
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
@@ -346,15 +353,20 @@ class ReplicaManager(val config: KafkaConfig,
         partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
 
       replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
-      stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower
request from controller %d epoch %d"
-        .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic,
p.partitionId)).mkString(","), controllerId, correlationId))
+      partitionState.foreach { state =>
+        stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request
from controller " +
+          "%d epoch %d with correlation id %d for partition %s")
+          .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic,
state._1.partitionId)))
+      }
 
       logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch)
=>
         new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
       })
-      stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries
for partitions %s as per becoming-follower request from controller %d epoch %d"
-        .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic,
p.partitionId)).mkString(","), controllerId, correlationId))
-
+      partitionState.foreach { state =>
+        stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries
for partition %s as part of " +
+          "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
+          TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId,
epoch))
+      }
       if (!isShuttingDown.get()) {
         val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]()
         partitionState.foreach {
@@ -366,30 +378,34 @@ class ReplicaManager(val config: KafkaConfig,
                                         BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset))
               case None =>
                 stateChangeLogger.trace(("Broker %d ignored the become-follower state change
with correlation id %d " +
-                                         "controller %d epoch %d for topic-partition %s since
the designated leader %d " +
-                                         "cannot be found in live or shutting down brokers
%s")
-                                           .format(localBrokerId, correlationId, controllerId,
epoch, partition, leader, leaders))
+                                         "controller %d epoch %d for partition %s since the
designated leader %d " +
+                                         "cannot be found in live or shutting down brokers
%s").format(localBrokerId,
+                                         correlationId, controllerId, epoch, partition, leader,
leaders.mkString(",")))
             }
         }
         replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
       }
       else {
-        stateChangeLogger.trace(("Broker %d ignored the become-follower state change with
correlation id %d from " +
-          "controller %d epoch %d since it is shutting down")
-          .format(localBrokerId, correlationId, controllerId, epoch))
+        partitionState.foreach { state =>
+          stateChangeLogger.trace(("Broker %d ignored the become-follower state change with
correlation id %d from " +
+            "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId,
correlationId,
+            controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
+        }
       }
     } catch {
       case e: Throwable =>
-        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
+        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId
%d received from controller %d " +
           "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
         stateChangeLogger.error(errorMsg, e)
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
-    stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
-      "for the become-follower transition for partitions %s")
-      .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p
=> TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
+    partitionState.foreach { state =>
+      stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d
from controller %d epoch %d " +
+        "for the become-follower transition for partition %s")
+        .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic,
state._1.partitionId)))
+    }
   }
 
   private def maybeShrinkIsr(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index c30069e..86f88f5 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -153,6 +153,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
       Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    // in sync replicas should not have any replica that is not in the new assigned replicas
+    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
     servers.foreach(_.shutdown())
   }
@@ -179,7 +182,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    // leader should be 2
+    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     servers.foreach(_.shutdown())
   }
 
@@ -205,7 +208,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     }, 2000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
-    // leader should be 2
+    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     servers.foreach(_.shutdown())
   }
 
@@ -222,7 +225,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
     assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
-    // leader should be 2
     servers.foreach(_.shutdown())
   }
 
@@ -244,6 +246,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
+    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     servers.foreach(_.shutdown())
   }
 
@@ -326,7 +329,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
       servers.foreach(_.shutdown())
     }
   }
-  
+
   /**
    * This test creates a topic with a few config overrides and checks that the configs are
applied to the new topic
    * then changes the config and checks that the new values take effect.
@@ -336,14 +339,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val partitions = 3
     val topic = "my-topic"
     val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
-    
+
     def makeConfig(messageSize: Int, retentionMs: Long) = {
       var props = new Properties()
       props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
       props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
       props
     }
-    
+
     def checkConfig(messageSize: Int, retentionMs: Long) {
       TestUtils.retry(10000) {
         for(part <- 0 until partitions) {
@@ -354,14 +357,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
         }
       }
     }
-    
+
     try {
       // create a topic with a few config overrides and check that they are applied
       val maxMessageSize = 1024
       val retentionMs = 1000*1000
       AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize,
retentionMs))
       checkConfig(maxMessageSize, retentionMs)
-      
+
       // now double the config values for the topic and check that it is applied
       AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2
* retentionMs))
       checkConfig(2*maxMessageSize, 2 * retentionMs)
@@ -371,6 +374,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     }
   }
 
+  private def checkForPhantomInSyncReplicas(topic: String, partitionToBeReassigned: Int,
assignedReplicas: Seq[Int]) {
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    // in sync replicas should not have any replica that is not in the new assigned replicas
+    val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
+    assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas,
assignedReplicas),
+      phantomInSyncReplicas.size == 0)
+  }
+
   private def checkIfReassignPartitionPathExists(): Boolean = {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 7026432..2cd3a3f 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -31,6 +31,7 @@ class IsrExpirationTest extends JUnit3Suite {
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
     override val replicaLagTimeMaxMs = 100L
+    override val replicaFetchWaitMaxMs = 100
     override val replicaLagMaxMessages = 10L
   })
   val topic = "foo"

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 34e39e7..17a99f1 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -32,6 +32,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
     override val replicaLagTimeMaxMs = 5000L
     override val replicaLagMaxMessages = 10L
+    override val replicaFetchWaitMaxMs = 1000
     override val replicaFetchMinBytes = 20
   })
   val topic = "new-topic"

http://git-wip-us.apache.org/repos/asf/kafka/blob/32aae720/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index bab436d..03e6266 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -34,6 +34,7 @@ class SimpleFetchTest extends JUnit3Suite {
 
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
     override val replicaLagTimeMaxMs = 100L
+    override val replicaFetchWaitMaxMs = 100
     override val replicaLagMaxMessages = 10L
   })
   val topic = "foo"


Mime
View raw message