kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: MINOR: Mention leader in a few follower/controller log messages (#4835)
Date Sun, 08 Apr 2018 03:20:28 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fedac0c  MINOR: Mention leader in a few follower/controller log messages (#4835)
fedac0c is described below

commit fedac0cea74feeeece529ee1c0cefd6af53ecbdd
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Sat Apr 7 20:20:21 2018 -0700

    MINOR: Mention leader in a few follower/controller log messages (#4835)
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  3 +-
 .../scala/kafka/controller/KafkaController.scala   |  5 +--
 .../main/scala/kafka/server/ReplicaManager.scala   | 37 ++++++++++++----------
 3 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 6eb9611..93377ba 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -261,7 +261,8 @@ class Partition(val topic: String,
       (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
 
-      info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch}
from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")
+      info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch}
from " +
+        s"offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was:
$leaderEpoch")
 
       //We cache the leader epoch here, persisting it only if it's local (hence having a
log dir)
       leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4778a7a..eee625e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -873,7 +873,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
   }
 
   private def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],
-                                                   isTriggeredByAutoRebalance : Boolean)
{
+                                                           isTriggeredByAutoRebalance : Boolean)
{
     for (partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -881,7 +881,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
       if (currentLeader == preferredReplica) {
         info(s"Partition $partition completed preferred replica leader election. New leader
is $preferredReplica")
       } else {
-        warn(s"Partition $partition failed to complete preferred replica leader election.
Leader is $currentLeader")
+        warn(s"Partition $partition failed to complete preferred replica leader election
to $preferredReplica. " +
+          s"Leader is still $currentLeader")
       }
     }
     if (!isTriggeredByAutoRebalance) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d4abc11..0c2b0d8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1150,7 +1150,7 @@ class ReplicaManager(val config: KafkaConfig,
     for (partition <- partitionState.keys)
       responseMap.put(partition.topicPartition, Errors.NONE)
 
-    val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
+    val partitionsToMakeLeaders = mutable.Set[Partition]()
 
     try {
       // First stop fetchers for all the partitions
@@ -1218,15 +1218,16 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeFollowers(controllerId: Int,
                             epoch: Int,
-                            partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
+                            partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState],
                             correlationId: Int,
                             responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition]
= {
-    partitionState.keys.foreach { partition =>
+    partitionStates.foreach { case (partition, partitionState) =>
       stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId
from controller $controllerId " +
-        s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition}")
+        s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition}
with leader " +
+        s"${partitionState.basePartitionState.leader}")
     }
 
-    for (partition <- partitionState.keys)
+    for (partition <- partitionStates.keys)
       responseMap.put(partition.topicPartition, Errors.NONE)
 
     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
@@ -1234,9 +1235,9 @@ class ReplicaManager(val config: KafkaConfig,
     try {
 
       // TODO: Delete leaders from LeaderAndIsrRequest
-      partitionState.foreach{ case (partition, partitionStateInfo) =>
+      partitionStates.foreach { case (partition, partitionStateInfo) =>
+        val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
         try {
-          val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
           metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
             // Only change partition state when the leader is available
             case Some(_) =>
@@ -1263,10 +1264,11 @@ class ReplicaManager(val config: KafkaConfig,
           case e: KafkaStorageException =>
             stateChangeLogger.error(s"Skipped the become-follower state change with correlation
id $correlationId from " +
               s"controller $controllerId epoch $epoch for partition ${partition.topicPartition}
" +
-              s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})
since the " +
-              s"replica for the partition is offline due to disk error $e")
+              s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})
with leader " +
+              s"$newLeaderBrokerId since the replica for the partition is offline due to
disk error $e")
             val dirOpt = getLogDir(partition.topicPartition)
-            error(s"Error while making broker the follower for partition $partition in dir
$dirOpt", e)
+            error(s"Error while making broker the follower for partition $partition with
leader " +
+              s"$newLeaderBrokerId in dir $dirOpt", e)
             responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
         }
       }
@@ -1274,7 +1276,8 @@ class ReplicaManager(val config: KafkaConfig,
       replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from
controller $controllerId " +
-          s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition}")
+          s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition}
with leader " +
+          s"${partitionStates(partition).basePartitionState.leader}")
       }
 
       partitionsToMakeFollower.foreach { partition =>
@@ -1286,14 +1289,15 @@ class ReplicaManager(val config: KafkaConfig,
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for
partition " +
           s"${partition.topicPartition} as part of become-follower request with correlation
id $correlationId from " +
-          s"controller $controllerId epoch $epoch")
+          s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}")
       }
 
       if (isShuttingDown.get()) {
         partitionsToMakeFollower.foreach { partition =>
           stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower
state " +
             s"change with correlation id $correlationId from controller $controllerId epoch
$epoch for " +
-            s"partition ${partition.topicPartition} since it is shutting down")
+            s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}
" +
+            "since it is shutting down")
         }
       }
       else {
@@ -1307,7 +1311,7 @@ class ReplicaManager(val config: KafkaConfig,
         partitionsToMakeFollower.foreach { partition =>
           stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower
" +
             s"request from controller $controllerId epoch $epoch with correlation id $correlationId
for " +
-            s"partition ${partition.topicPartition}")
+            s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}")
         }
       }
     } catch {
@@ -1318,9 +1322,10 @@ class ReplicaManager(val config: KafkaConfig,
         throw e
     }
 
-    partitionState.keys.foreach { partition =>
+    partitionStates.keys.foreach { partition =>
       stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId
from controller $controllerId " +
-        s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition}")
+        s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition}
with leader " +
+        s"${partitionStates(partition).basePartitionState.leader}")
     }
 
     partitionsToMakeFollower

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.

Mime
View raw message