kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2738: Replica FetcherThread should connect to leader endpoint matching its inter-broker security protocol.
Date Thu, 05 Nov 2015 17:05:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b30d68a4e -> d23785ff2


KAFKA-2738: Replica FetcherThread should connect to leader endpoint matching its inter-broker
security protocol.

…atching its inter-broker security protocol

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Jun Rao, Guozhang Wang

Closes #428 from gwenshap/KAFKA-2738


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d23785ff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d23785ff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d23785ff

Branch: refs/heads/trunk
Commit: d23785ff2df136090e3c80a3a6df5cb85443924b
Parents: b30d68a
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Thu Nov 5 09:11:24 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Nov 5 09:11:24 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala      |  2 +-
 core/src/main/scala/kafka/server/ReplicaManager.scala | 14 +++++++-------
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d23785ff/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index df064e4..d1c6f79 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     try {
       // call replica manager to handle updating partitions to become leader or follower
-      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache)
       val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId,
result.responseMap, result.errorCode)
       // for each new leader or follower, call coordinator to handle
       // consumer group migration

http://git-wip-us.apache.org/repos/asf/kafka/blob/d23785ff/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 89f2462..7823659 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -583,7 +583,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): BecomeLeaderOrFollowerResult
= {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, metadataCache: MetadataCache):
BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo)
=>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id
%d from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
@@ -639,7 +639,7 @@ class ReplicaManager(val config: KafkaConfig,
         else
           Set.empty[Partition]
         val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders,
leaderAndISRRequest.correlationId, responseMap)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.correlationId,
responseMap, metadataCache)
         else
           Set.empty[Partition]
 
@@ -731,9 +731,9 @@ class ReplicaManager(val config: KafkaConfig,
   private def makeFollowers(controllerId: Int,
                             epoch: Int,
                             partitionState: Map[Partition, PartitionStateInfo],
-                            leaders: Set[BrokerEndPoint],
                             correlationId: Int,
-                            responseMap: mutable.Map[(String, Int), Short]) : Set[Partition]
= {
+                            responseMap: mutable.Map[(String, Int), Short],
+                            metadataCache: MetadataCache) : Set[Partition] = {
     partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d
from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")
@@ -751,7 +751,7 @@ class ReplicaManager(val config: KafkaConfig,
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
         val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
-        leaders.find(_.id == newLeaderBrokerId) match {
+        metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
           // Only change partition state when the leader is available
           case Some(leaderBroker) =>
             if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
@@ -762,7 +762,7 @@ class ReplicaManager(val config: KafkaConfig,
                 .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
                 partition.topic, partition.partitionId, newLeaderBrokerId))
           case None =>
-            // The leader broker should always be present in the leaderAndIsrRequest.
+            // The leader broker should always be present in the metadata cache.
             // If not, we should record the error message and abort the transition process
for this partition
             stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation
id %d from controller" +
               " %d epoch %d for partition [%s,%d] but cannot become follower since the new
leader %d is unavailable.")
@@ -800,7 +800,7 @@ class ReplicaManager(val config: KafkaConfig,
         // we do not need to check if the leader exists again since this has been done at
the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition
=>
           new TopicAndPartition(partition) -> BrokerAndInitialOffset(
-            leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
+            metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
             partition.getReplica().get.logEndOffset.messageOffset)).toMap
         replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
 


Mime
View raw message