kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1152; ReplicaManager's handling of the leaderAndIsrRequest should gracefully handle leader == -1; patched by Swapnil Ghike; reviewed by Jun Rao
Date Fri, 29 Nov 2013 17:29:13 GMT
Updated Branches:
  refs/heads/trunk df288b75a -> 224f192c7


kafka-1152; ReplicaManager's handling of the leaderAndIsrRequest should gracefully handle
leader == -1; patched by Swapnil Ghike; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/224f192c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/224f192c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/224f192c

Branch: refs/heads/trunk
Commit: 224f192c776b4348596fbf068771c98aa9db9f3d
Parents: df288b7
Author: Swapnil Ghike <sghike@linkedin.com>
Authored: Fri Nov 29 09:29:56 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Nov 29 09:29:56 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/224f192c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 161f581..54f6e16 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -328,8 +328,7 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know which partition
caused it
    *  TODO: the above may need to be fixed later
    */
-  private def makeFollowers(controllerId: Int, epoch: Int,
-                            partitionState: Map[Partition, PartitionStateInfo],
+  private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition,
PartitionStateInfo],
                             leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String,
Int), Short]) {
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
       "starting the become-follower transition for partitions %s")
@@ -351,11 +350,22 @@ class ReplicaManager(val config: KafkaConfig,
         .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic,
p.partitionId)).mkString(","), controllerId, correlationId))
 
       if (!isShuttingDown.get()) {
-        replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition,
partitionStateInfo) =>
-          new TopicAndPartition(partition) ->
-            BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get,
-              partition.getReplica().get.logEndOffset)}
-        )
+        val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]()
+        partitionState.foreach {
+          case (partition, partitionStateInfo) =>
+            val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+            leaders.find(_.id == leader) match {
+              case Some(leaderBroker) =>
+                partitionAndOffsets.put(new TopicAndPartition(partition), 
+                                        BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset))
+              case None =>
+                stateChangeLogger.trace("Broker %d ignored the become-follower state change
with correlation id %d " +
+                                        "controller %d epoch %d for topic-partition %s since
the designated leader %d " +
+                                        "cannot be found in live or shutting down brokers
%s"
+                                          .format(localBrokerId, correlationId, controllerId,
epoch, partition, leader, leaders))
+            }
+        }
+        replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
       }
       else {
         stateChangeLogger.trace(("Broker %d ignored the become-follower state change with
correlation id %d from " +


Mime
View raw message