kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Make the state change log more consistent
Date Mon, 18 Sep 2017 16:00:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f2b74aa1c -> ae3eeb3e1


MINOR: Make the state change log more consistent

Use logIdent to achieve this.

Also fixed an issue where we were logging about replicas going offline with
an empty set of replicas (i.e. no replicas had gone offline so no need to log).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3869 from ijuma/improve-state-change-log


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae3eeb3e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae3eeb3e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae3eeb3e

Branch: refs/heads/trunk
Commit: ae3eeb3e1c194c60fa62fbc7931566f3d8c87d68
Parents: f2b74aa
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Sep 18 09:00:37 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Sep 18 09:00:37 2017 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   |  28 ++---
 .../kafka/controller/KafkaController.scala      |  60 ++++++-----
 .../controller/PartitionStateMachine.scala      |  84 +++++++--------
 .../kafka/controller/ReplicaStateMachine.scala  |  45 ++++----
 .../kafka/controller/StateChangeLogger.scala    |  50 +++++++++
 .../group/GroupMetadataManager.scala            |   2 +-
 .../main/scala/kafka/server/MetadataCache.scala |  11 +-
 .../scala/kafka/server/ReplicaManager.scala     | 106 +++++++++----------
 core/src/main/scala/kafka/utils/Logging.scala   |   4 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   5 +-
 10 files changed, 216 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 693f297..6976f7c 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -46,7 +46,7 @@ object ControllerChannelManager {
 }
 
 class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics,
-                               threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+                               stateChangeLogger: StateChangeLogger, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   import ControllerChannelManager._
   protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
@@ -151,7 +151,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     }
 
     val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
-      brokerNode, config, time, threadName)
+      brokerNode, config, time, stateChangeLogger, threadName)
     requestThread.setDaemon(false)
 
     val queueSizeGauge = newGauge(
@@ -201,10 +201,10 @@ class RequestSendThread(val controllerId: Int,
                         val brokerNode: Node,
                         val config: KafkaConfig,
                         val time: Time,
+                        val stateChangeLogger: StateChangeLogger,
                         name: String)
   extends ShutdownableThread(name = name) {
 
-  private val stateChangeLogger = KafkaController.stateChangeLogger
   private val socketTimeoutMs = config.controllerSocketTimeoutMs
 
   override def doWork(): Unit = {
@@ -247,8 +247,8 @@ class RequestSendThread(val controllerId: Int,
 
         val response = clientResponse.responseBody
 
-        stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
-          .format(controllerId, controllerContext.epoch, response.toString(requestHeader.apiVersion), brokerNode.toString))
+        stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace("Received response " +
+          s"${response.toString(requestHeader.apiVersion)} for a request sent to broker $brokerNode")
 
         if (callback != null) {
           callback(response)
@@ -282,14 +282,13 @@ class RequestSendThread(val controllerId: Int,
 
 }
 
-class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging {
+class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends  Logging {
   val controllerContext = controller.controllerContext
   val controllerId: Int = controller.config.brokerId
   val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrRequest.PartitionState]]
   val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
   val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
   val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataRequest.PartitionState]
-  private val stateChangeLogger = KafkaController.stateChangeLogger
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -396,6 +395,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
 
   def sendRequestsToBrokers(controllerEpoch: Int) {
     try {
+      val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch)
+
       val leaderAndIsrRequestVersion: Short =
         if (controller.config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
         else 0
@@ -405,8 +406,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           val typeOfRequest =
             if (broker == state.basePartitionState.leader) "become-leader"
             else "become-follower"
-          stateChangeLogger.trace(s"Controller $controllerId epoch $controllerEpoch sending " +
-            s"$typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
+          stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
         }
         val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
@@ -419,12 +419,12 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       }
       leaderAndIsrRequestMap.clear()
 
-      updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
-        "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2,
-        updateMetadataRequestBrokerSet.toString(), p._1)))
-      // Copy the updateMetadataRequestPartitionInfoMap
+      updateMetadataRequestPartitionInfoMap.foreach { case (tp, partitionState) =>
+        stateChangeLog.trace(s"Sending UpdateMetadata request $partitionState to brokers $updateMetadataRequestBrokerSet " +
+          s"for partition $tp")
+      }
 
-      val partitionStates = Map(updateMetadataRequestPartitionInfoMap.toArray:_*)
+      val partitionStates = Map.empty ++ updateMetadataRequestPartitionInfoMap
       val updateMetadataRequestVersion: Short =
         if (controller.config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5ff47bf..5db2c5d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -129,12 +129,10 @@ class ControllerContext(val zkUtils: ZkUtils) {
 
 
 object KafkaController extends Logging {
-  val stateChangeLogger = StateChangeLogger("state.change.logger")
+  
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
 
-  case class StateChangeLogger(override val loggerName: String) extends Logging
-
   def parseControllerId(controllerInfoString: String): Int = {
     try {
       Json.parseFull(controllerInfoString) match {
@@ -155,11 +153,13 @@ object KafkaController extends Logging {
 }
 
 class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
-  this.logIdent = "[Controller " + config.brokerId + "]: "
-  private val stateChangeLogger = KafkaController.stateChangeLogger
+
+  this.logIdent = s"[Controller id=${config.brokerId}] "
+
+  private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
   val controllerContext = new ControllerContext(zkUtils)
-  val partitionStateMachine = new PartitionStateMachine(this)
-  val replicaStateMachine = new ReplicaStateMachine(this)
+  val partitionStateMachine = new PartitionStateMachine(this, stateChangeLogger)
+  val replicaStateMachine = new ReplicaStateMachine(this, stateChangeLogger)
 
   // have a separate scheduler for the controller to be able to start and stop independently of the kafka server
   // visible for testing
@@ -174,7 +174,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
 
   private val brokerChangeListener = new BrokerChangeListener(this, eventManager)
   private val topicChangeListener = new TopicChangeListener(this, eventManager)
@@ -270,7 +270,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    * This ensures another controller election will be triggered and there will always be an actively serving controller
    */
   def onControllerFailover() {
-    info("Broker %d starting become controller state transition".format(config.brokerId))
+    info("Starting become controller state transition")
     readControllerEpochFromZookeeper()
     incrementControllerEpoch()
     LogDirUtils.deleteLogDirEvents(zkUtils)
@@ -299,12 +299,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
     // register the partition change listeners for all existing topics on failover
     controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
-    info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
+    info(s"Ready to serve as the new controller with epoch $epoch")
     maybeTriggerPartitionReassignment()
     topicDeletionManager.tryTopicDeletion()
     val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
     onPreferredReplicaElection(pendingPreferredReplicaElections)
-    info("starting the controller scheduler")
+    info("Starting the controller scheduler")
     kafkaScheduler.startup()
     if (config.autoLeaderRebalanceEnable) {
       scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
@@ -321,7 +321,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    * required to clean up internal controller data structures
    */
   def onControllerResignation() {
-    debug("Controller resigning, broker id %d".format(config.brokerId))
+    debug("Resigning")
     // de-register listeners
     deregisterIsrChangeNotificationListener()
     deregisterPartitionReassignmentListener()
@@ -351,7 +351,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
     resetControllerContext()
 
-    info("Broker %d resigned as the controller".format(config.brokerId))
+    info("Resigned")
   }
 
   /**
@@ -690,7 +690,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       case oe: Throwable => error("Error while incrementing controller epoch", oe)
 
     }
-    info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
+    info(s"Incremented epoch to ${controllerContext.epoch}")
   }
 
   private def registerSessionExpirationListener() = {
@@ -792,7 +792,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   private def startChannelManager() {
-    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics, threadNamePrefix)
+    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
+      stateChangeLogger, threadNamePrefix)
     controllerContext.controllerChannelManager.startup()
   }
 
@@ -872,6 +873,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+    val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
     updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
@@ -883,13 +885,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
           case e: IllegalStateException =>
             handleIllegalState(e)
         }
-        stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " +
-          "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch,
-          newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition))
+        stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with new assigned replica " +
+          s"list ${newAssignedReplicas.mkString(",")} to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
+          s"for partition being reassigned $topicAndPartition")
       case None => // fail the reassignment
-        stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " +
-          "to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch,
-          newAssignedReplicas.mkString(","), topicAndPartition))
+        stateChangeLog.error("Failed to send LeaderAndIsr request with new assigned replica list " +
+          s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $topicAndPartition")
     }
   }
 
@@ -1524,23 +1525,24 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       import JavaConverters._
       val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
 
-      if (leaderAndIsrResponse.error() != Errors.NONE) {
-        stateChangeLogger.error(s"Received error in leaderAndIsrResponse $leaderAndIsrResponse from broker $brokerId")
+      if (leaderAndIsrResponse.error != Errors.NONE) {
+        stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
         return
       }
 
       val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
-        tp => TopicAndPartition(tp.topic(), tp.partition())).toSet
+        new TopicAndPartition(_)).toSet
       val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
-        tp => TopicAndPartition(tp.topic(), tp.partition())).toSet
+        new TopicAndPartition(_)).toSet
       val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
       val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
       controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
-      val newOfflineReplicas = (currentOfflineReplicas -- previousOfflineReplicas).map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId))
-      stateChangeLogger.info(s"Mark replicas ${currentOfflineReplicas -- previousOfflineReplicas} on broker $brokerId as offline")
+      val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
 
-      if (newOfflineReplicas.nonEmpty)
-        onReplicasBecomeOffline(newOfflineReplicas)
+      if (newOfflineReplicas.nonEmpty) {
+        stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
+        onReplicasBecomeOffline(newOfflineReplicas.map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 7257501..9e75bc0 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -37,17 +37,16 @@ import scala.collection._
  * 4. OfflinePartition    : If, after successful leader election, the leader for partition dies, then the partition
  *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
  */
-class PartitionStateMachine(controller: KafkaController) extends Logging {
+class PartitionStateMachine(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends Logging {
+
   private val controllerContext = controller.controllerContext
   private val controllerId = controller.config.brokerId
   private val zkUtils = controllerContext.zkUtils
   private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller, stateChangeLogger)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
 
-  private val stateChangeLogger = KafkaController.stateChangeLogger
-
-  this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
+  this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
 
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all
@@ -58,7 +57,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     initializePartitionState()
     triggerOnlinePartitionStateChange()
 
-    info("Started partition state machine with initial state -> " + partitionState.toString())
+    info(s"Started partition state machine with initial state -> $partitionState")
   }
 
   /**
@@ -145,15 +144,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
                                 callbacks: Callbacks) {
     val topicAndPartition = TopicAndPartition(topic, partition)
     val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
+    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
     try {
       assertValidTransition(topicAndPartition, targetState)
       targetState match {
         case NewPartition =>
           partitionState.put(topicAndPartition, NewPartition)
           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
-                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
-                                            assignedReplicas))
+          stateChangeLog.trace(s"Changed partition $topicAndPartition state from $currState to $targetState with " +
+            s"assigned replicas $assignedReplicas")
           // post: partition has been assigned replicas
         case OnlinePartition =>
           partitionState(topicAndPartition) match {
@@ -168,25 +167,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           }
           partitionState.put(topicAndPartition, OnlinePartition)
           val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
-                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
+          stateChangeLog.trace(s"Changed partition $topicAndPartition from $currState to $targetState with leader $leader")
            // post: partition has a leader
         case OfflinePartition =>
           // should be called when the leader for a partition is no longer alive
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
-                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
+          stateChangeLog.trace(s"Changed partition $topicAndPartition state from $currState to $targetState")
           partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
-                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
+          stateChangeLogger.trace(s"Changed partition $topicAndPartition state from $currState to $targetState")
           partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
       case t: Throwable =>
-        stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
-          .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
+        stateChangeLog.error(s"Initiated state change for partition $topicAndPartition from $currState to $targetState failed",
+          t)
     }
   }
 
@@ -228,15 +224,14 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) = {
     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition).toList
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition))
+    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
     liveAssignedReplicas.headOption match {
       case None =>
-        val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered error during state change of " +
-          s"partition $topicAndPartition from New to Online, assigned replicas are " +
-          s"[${replicaAssignment.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
-          "replica is alive."
-
-        stateChangeLogger.error(failMsg)
-        throw new StateChangeFailedException(failMsg)
+        val failMsg = s"Encountered error during state change of partition $topicAndPartition from New to Online, " +
+          s"assigned replicas are [${replicaAssignment.mkString(",")}], live brokers are " +
+          s"[${controllerContext.liveBrokerIds}]. No assigned replica is alive."
+        stateChangeLog.error(failMsg)
+        throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg))
 
       // leader is the first replica in the list of assigned replicas
       case Some(leader) =>
@@ -268,11 +263,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
               topicAndPartition.partition).get
 
-            val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered error while changing " +
-              s"partition $topicAndPartition's state from New to Online since LeaderAndIsr path already exists with " +
-              s"value ${leaderIsrAndEpoch.leaderAndIsr} and controller epoch ${leaderIsrAndEpoch.controllerEpoch}"
-            stateChangeLogger.error(failMsg)
-            throw new StateChangeFailedException(failMsg)
+            val failMsg = s"Encountered error while changing partition $topicAndPartition's state from New to Online " +
+              s"since LeaderAndIsr path already exists with value ${leaderIsrAndEpoch.leaderAndIsr} and controller " +
+              s"epoch ${leaderIsrAndEpoch.controllerEpoch}"
+            stateChangeLog.error(failMsg)
+            throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg))
         }
     }
   }
@@ -286,9 +281,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    */
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
+    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
     // handle leader election for the partitions whose leader is no longer alive
-    stateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s"
-                              .format(controllerId, controller.epoch, topicAndPartition))
+    stateChangeLog.trace(s"Started leader election for partition $topicAndPartition")
     try {
       var zookeeperPathUpdateSucceeded: Boolean = false
       var newLeaderAndIsr: LeaderAndIsr = null
@@ -298,11 +293,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
         val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
         if (controllerEpoch > controller.epoch) {
-          val failMsg = s"aborted leader election for partition $topicAndPartition since the LeaderAndIsr path was " +
-            s"already written by another controller. This probably means that the current controller $controllerId went through " +
-            s"a soft failure and another controller was elected with epoch $controllerEpoch."
-          stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
-          throw new StateChangeFailedException(failMsg)
+          val failMsg = s"Aborted leader election for partition $topicAndPartition since the LeaderAndIsr path was " +
+            s"already written by another controller. This probably means that the current controller $controllerId went " +
+            s"through a soft failure and another controller was elected with epoch $controllerEpoch."
+          stateChangeLog.error(failMsg)
+          throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg))
         }
         // elect new leader or throw exception
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
@@ -315,8 +310,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       val newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
       controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
-      stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
-                                .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
+      stateChangeLog.trace(s"Elected leader ${newLeaderAndIsr.leader} for Offline partition $topicAndPartition")
       val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
@@ -325,21 +319,19 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       case _: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
       case sce: Throwable =>
-        val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
-        stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
-        throw new StateChangeFailedException(failMsg, sce)
+        val failMsg = s"Encountered error while electing leader for partition $topicAndPartition due to: ${sce.getMessage}"
+        stateChangeLog.error(failMsg)
+        throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg), sce)
     }
-    debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
+    debug(s"After leader election, leader cache is updated to ${controllerContext.partitionLeadershipInfo}")
   }
 
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
     val topicAndPartition = TopicAndPartition(topic, partition)
     ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) match {
       case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
-      case None =>
-        val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state"
-                        .format(topicAndPartition, partitionState(topicAndPartition))
-        throw new StateChangeFailedException(failMsg)
+      case None => throw new StateChangeFailedException("LeaderAndIsr information doesn't exist for " +
+        s"partition $topicAndPartition in ${partitionState(topicAndPartition)} state")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 6b7adfe..f140f02 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -40,16 +40,15 @@ import scala.collection._
  * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
  *                        ReplicaDeletionSuccessful
  */
-class ReplicaStateMachine(controller: KafkaController) extends Logging {
+class ReplicaStateMachine(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends Logging {
+
   private val controllerContext = controller.controllerContext
   private val controllerId = controller.config.brokerId
   private val zkUtils = controllerContext.zkUtils
   private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
-
-  private val stateChangeLogger = KafkaController.stateChangeLogger
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller, stateChangeLogger)
 
-  this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
+  this.logIdent = s"[ReplicaStateMachine controllerId=$controllerId] "
 
 
   /**
@@ -135,7 +134,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     val replicaId = partitionAndReplica.replica
     val topicAndPartition = TopicAndPartition(topic, partition)
     val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
+    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
     try {
+
+      def logStateChange(): Unit =
+        stateChangeLog.trace(s"Changed state of replica $replicaId for partition $topicAndPartition from " +
+          s"$currState to $targetState")
+
       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
       assertValidTransition(partitionAndReplica, targetState)
       targetState match {
@@ -153,31 +158,25 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put(partitionAndReplica, NewReplica)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
-                                            targetState))
+          logStateChange()
         case ReplicaDeletionStarted =>
           replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
             callbacks.stopReplicaResponseCallback)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
+          logStateChange()
         case ReplicaDeletionIneligible =>
           replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
+          logStateChange()
         case ReplicaDeletionSuccessful =>
           replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
+          logStateChange()
         case NonExistentReplica =>
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
           controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
           replicaState.remove(partitionAndReplica)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
+          logStateChange()
         case OnlineReplica =>
           replicaState(partitionAndReplica) match {
             case NewReplica =>
@@ -185,9 +184,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
               if(!currentAssignedReplicas.contains(replicaId))
                 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-                                        .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
-                                                targetState))
+              logStateChange()
             case _ =>
               // check if the leader for this partition ever existed
               controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
@@ -195,8 +192,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                     replicaAssignment)
                   replicaState.put(partitionAndReplica, OnlineReplica)
-                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-                    .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
+                  logStateChange()
                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
                              // started a log for that partition and does not have a high watermark value for this partition
               }
@@ -218,8 +214,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                         topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                     }
                     replicaState.put(partitionAndReplica, OfflineReplica)
-                    stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
-                      .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
+                    logStateChange()
                     false
                   case None =>
                     true
@@ -236,8 +231,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     }
     catch {
       case t: Throwable =>
-        stateChangeLogger.error(s"Controller $controllerId epoch ${controller.epoch} initiated state change of " +
-          s"replica $replicaId for partition $topicAndPartition from $currState to $targetState failed", t)
+        stateChangeLog.error(s"Initiated state change of replica $replicaId for partition $topicAndPartition from " +
+          s"$currState to $targetState failed", t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/controller/StateChangeLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
new file mode 100644
index 0000000..21c70c3
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/StateChangeLogger.scala
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import kafka.utils.Logging
+import org.apache.log4j
+
+object StateChangeLogger {
+  private val Logger = log4j.Logger.getLogger("state.change.logger")
+}
+
+/**
+ * Simple class that sets `logIdent` appropriately depending on whether the state change logger is being used in the
+ * context of the KafkaController or not (e.g. ReplicaManager and MetadataCache log to the state change logger
+ * irrespective of whether the broker is the Controller).
+ */
+class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerEpoch: Option[Int]) extends Logging {
+
+  if (controllerEpoch.isDefined && !inControllerContext)
+    throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true")
+
+  override lazy val logger = StateChangeLogger.Logger
+
+  locally {
+    val prefix = if (inControllerContext) "Controller" else "Broker"
+    val epochEntry = controllerEpoch.fold("")(epoch => s" epoch=$epoch")
+    logIdent = s"[$prefix id=$brokerId$epochEntry] "
+  }
+
+  def withControllerEpoch(controllerEpoch: Int): StateChangeLogger =
+    new StateChangeLogger(brokerId, inControllerContext, Some(controllerEpoch))
+
+  def messageWithPrefix(message: String): String = msgWithLogIdent(message)
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index db2bd67..c2ebbad 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -80,7 +80,7 @@ class GroupMetadataManager(brokerId: Int,
    * We use this structure to quickly find the groups which need to be updated by the commit/abort marker. */
   private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]()
 
-  this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
+  this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "
 
   newGauge("NumOffsets",
     new Gauge[Int] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 293b0dc..fd11bfa 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.api._
 import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition}
-import kafka.controller.KafkaController
+import kafka.controller.StateChangeLogger
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.Topic
@@ -38,14 +38,15 @@ import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
 class MetadataCache(brokerId: Int) extends Logging {
-  private val stateChangeLogger = KafkaController.stateChangeLogger
+
   private val cache = mutable.Map[String, mutable.Map[Int, UpdateMetadataRequest.PartitionState]]()
   private var controllerId: Option[Int] = None
   private val aliveBrokers = mutable.Map[Int, Broker]()
   private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
-  this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] "
+  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
+  private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
 
   // This method is the main hotspot when it comes to the performance of metadata requests,
   // we should be careful about adding additional logic here.
@@ -207,12 +208,12 @@ class MetadataCache(brokerId: Int) extends Logging {
         val controllerEpoch = updateMetadataRequest.controllerEpoch
         if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
           removePartitionInfo(tp.topic, tp.partition)
-          stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
+          stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
             s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
           deletedPartitions += tp
         } else {
           addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
-          stateChangeLogger.trace(s"Broker $brokerId cached leader info $info for partition $tp in response to " +
+          stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +
             s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 113dbcb..064472d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
-import kafka.controller.KafkaController
+import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log.{Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
@@ -182,10 +182,12 @@ class ReplicaManager(val config: KafkaConfig,
 
   private var hwThreadInitialized = false
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
-  val stateChangeLogger = KafkaController.stateChangeLogger
+  private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
+
   private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
   private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
   private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
+
   private var logDirFailureHandler: LogDirFailureHandler = null
 
   private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
@@ -315,7 +317,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors  = {
-    stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition")
+    stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition")
     val error = Errors.NONE
     getPartition(topicPartition) match {
       case Some(partition) =>
@@ -340,9 +342,9 @@ class ReplicaManager(val config: KafkaConfig,
         // This could happen when topic is being deleted while broker is down and recovers.
         if (deletePartition && logManager.getLog(topicPartition).isDefined)
           logManager.asyncDelete(topicPartition)
-        stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
+        stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
     }
-    stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
+    stateChangeLogger.trace(s"Finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
     error
   }
 
@@ -350,8 +352,8 @@ class ReplicaManager(val config: KafkaConfig,
     replicaStateChangeLock synchronized {
       val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
       if(stopReplicaRequest.controllerEpoch() < controllerEpoch) {
-        stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d"
-          .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch))
+        stateChangeLogger.warn("Received stop replica request from an old controller epoch " +
+          s"${stopReplicaRequest.controllerEpoch}. Latest known controller epoch is $controllerEpoch")
         (responseMap, Errors.STALE_CONTROLLER_EPOCH)
       } else {
         val partitions = stopReplicaRequest.partitions.asScala
@@ -364,7 +366,8 @@ class ReplicaManager(val config: KafkaConfig,
             responseMap.put(topicPartition, error)
           } catch {
             case e: KafkaStorageException =>
-              stateChangeLogger.error(s"Broker $localBrokerId ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for partition $topicPartition due to storage exception", e)
+              stateChangeLogger.error(s"Ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for " +
+                s"partition $topicPartition due to storage exception", e)
               responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
           }
         }
@@ -958,12 +961,11 @@ class ReplicaManager(val config: KafkaConfig,
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
-        val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
-          "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
-          correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
-          controllerEpoch)
+        val stateControllerEpochErrorMessage = s"Received update metadata request with correlation id $correlationId " +
+          s"from an old controller ${updateMetadataRequest.controllerId} with epoch ${updateMetadataRequest.controllerEpoch}. " +
+          s"Latest known controller epoch is $controllerEpoch"
         stateChangeLogger.warn(stateControllerEpochErrorMessage)
-        throw new ControllerMovedException(stateControllerEpochErrorMessage)
+        throw new ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))
       } else {
         val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest)
         controllerEpoch = updateMetadataRequest.controllerEpoch
@@ -976,17 +978,16 @@ class ReplicaManager(val config: KafkaConfig,
                              leaderAndISRRequest: LeaderAndIsrRequest,
                              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
-      stateChangeLogger.trace(s"Broker $localBrokerId received LeaderAndIsr request $stateInfo " +
+      stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " +
         s"correlation id $correlationId from controller ${leaderAndISRRequest.controllerId} " +
         s"epoch ${leaderAndISRRequest.controllerEpoch} for partition $topicPartition")
     }
     replicaStateChangeLock synchronized {
       val responseMap = new mutable.HashMap[TopicPartition, Errors]
       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-        stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
-          s"controller ${leaderAndISRRequest.controllerId} with correlation id $correlationId since " +
-          s"its controller epoch ${leaderAndISRRequest.controllerEpoch} is old. Latest known controller " +
-          s"epoch is $controllerEpoch")
+        stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndISRRequest.controllerId} with " +
+          s"correlation id $correlationId since its controller epoch ${leaderAndISRRequest.controllerEpoch} is old. " +
+          s"Latest known controller epoch is $controllerEpoch")
         BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
@@ -998,7 +999,7 @@ class ReplicaManager(val config: KafkaConfig,
           val partition = getOrCreatePartition(topicPartition)
           val partitionLeaderEpoch = partition.getLeaderEpoch
           if (partition eq ReplicaManager.OfflinePartition) {
-            stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+            stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
               s"controller $controllerId with correlation id $correlationId " +
               s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
               "partition is in an offline log directory")
@@ -1009,15 +1010,14 @@ class ReplicaManager(val config: KafkaConfig,
             if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
               partitionState.put(partition, stateInfo)
             else {
-              stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
-                s"controller $controllerId with correlation id $correlationId " +
-                s"epoch $controllerEpoch for partition $topicPartition as itself is not in assigned " +
-                s"replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
+              stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
+                s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
+                s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
               responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
             }
           } else {
             // Otherwise record the error code in response
-            stateChangeLogger.warn(s"Broker $localBrokerId ignoring LeaderAndIsr request from " +
+            stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
               s"controller $controllerId with correlation id $correlationId " +
               s"epoch $controllerEpoch for partition $topicPartition since its associated " +
               s"leader epoch ${stateInfo.basePartitionState.leaderEpoch} is not higher than the current " +
@@ -1083,7 +1083,7 @@ class ReplicaManager(val config: KafkaConfig,
                           correlationId: Int,
                           responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(s"Broker $localBrokerId handling LeaderAndIsr request correlationId $correlationId from " +
+      stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
         s"controller $controllerId epoch $epoch starting the become-leader transition for " +
         s"partition ${partition.topicPartition}")
     }
@@ -1101,17 +1101,17 @@ class ReplicaManager(val config: KafkaConfig,
         try {
           if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
             partitionsToMakeLeaders += partition
-            stateChangeLogger.trace(s"Broker $localBrokerId stopped fetchers as part of become-leader request from " +
+            stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
               s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " +
               s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})")
           } else
-            stateChangeLogger.info(s"Broker $localBrokerId skipped the become-leader state change after marking its " +
+            stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
               s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " +
               s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
               s"since it is already the leader for the partition.")
         } catch {
           case e: KafkaStorageException =>
-            stateChangeLogger.error(s"Broker $localBrokerId skipped the become-leader state change with " +
+            stateChangeLogger.error(s"Skipped the become-leader state change with " +
               s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
               s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " +
               s"the replica for the partition is offline due to disk error $e")
@@ -1124,17 +1124,16 @@ class ReplicaManager(val config: KafkaConfig,
     } catch {
       case e: Throwable =>
         partitionState.keys.foreach { partition =>
-          stateChangeLogger.error(s"Error on broker $localBrokerId while processing LeaderAndIsr request " +
-            s"correlationId $correlationId received from controller $controllerId epoch $epoch for " +
-            s"partition ${partition.topicPartition}", e)
+          stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " +
+            s"from controller $controllerId epoch $epoch for partition ${partition.topicPartition}", e)
         }
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(s"Broker $localBrokerId completed LeaderAndIsr request correlationId $correlationId from " +
-        s"controller $controllerId epoch $epoch for the become-leader transition for partition ${partition.topicPartition}")
+      stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
+        s"epoch $epoch for the become-leader transition for partition ${partition.topicPartition}")
     }
 
     partitionsToMakeLeaders
@@ -1164,9 +1163,8 @@ class ReplicaManager(val config: KafkaConfig,
                             correlationId: Int,
                             responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(s"Broker $localBrokerId handling LeaderAndIsr request correlationId $correlationId from " +
-        s"controller $controllerId epoch $epoch starting the become-follower transition for " +
-        s"partition ${partition.topicPartition}")
+      stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
+        s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition}")
     }
 
     for (partition <- partitionState.keys)
@@ -1186,16 +1184,16 @@ class ReplicaManager(val config: KafkaConfig,
               if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
                 partitionsToMakeFollower += partition
               else
-                stateChangeLogger.info(s"Broker $localBrokerId skipped the become-follower state change after marking " +
-                  s"its partition as follower with correlation id $correlationId from controller $controllerId epoch $epoch " +
+                stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
+                  s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " +
                   s"for partition ${partition.topicPartition} (last update " +
                   s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
                   s"since the new leader $newLeaderBrokerId is the same as the old leader")
             case None =>
               // The leader broker should always be present in the metadata cache.
               // If not, we should record the error message and abort the transition process for this partition
-              stateChangeLogger.error(s"Broker $localBrokerId received LeaderAndIsrRequest with " +
-                s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+              stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
+                s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
                 s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
                 s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
               // Create the local replica even if the leader is unavailable. This is required to ensure that we include
@@ -1204,8 +1202,8 @@ class ReplicaManager(val config: KafkaConfig,
           }
         } catch {
           case e: KafkaStorageException =>
-            stateChangeLogger.error(s"Broker $localBrokerId skipped the become-follower state change with " +
-              s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
+            stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " +
+              s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
               s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since the " +
               s"replica for the partition is offline due to disk error $e")
             val dirOpt = getLogDir(partition.topicPartition)
@@ -1216,9 +1214,8 @@ class ReplicaManager(val config: KafkaConfig,
 
       replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
       partitionsToMakeFollower.foreach { partition =>
-        stateChangeLogger.trace(s"Broker $localBrokerId stopped fetchers as part of become-follower request from " +
-          s"controller $controllerId epoch $epoch with correlation id $correlationId for " +
-          s"partition ${partition.topicPartition}")
+        stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
+          s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition}")
       }
 
       partitionsToMakeFollower.foreach { partition =>
@@ -1228,14 +1225,14 @@ class ReplicaManager(val config: KafkaConfig,
       }
 
       partitionsToMakeFollower.foreach { partition =>
-        stateChangeLogger.trace(s"Broker $localBrokerId truncated logs and checkpointed recovery boundaries for " +
-          s"partition ${partition.topicPartition} as part of become-follower request with " +
-          s"correlation id $correlationId from controller $controllerId epoch $epoch")
+        stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " +
+          s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " +
+          s"controller $controllerId epoch $epoch")
       }
 
       if (isShuttingDown.get()) {
         partitionsToMakeFollower.foreach { partition =>
-          stateChangeLogger.trace(s"Broker $localBrokerId skipped the adding-fetcher step of the become-follower state " +
+          stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
             s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " +
             s"partition ${partition.topicPartition} since it is shutting down")
         }
@@ -1249,23 +1246,22 @@ class ReplicaManager(val config: KafkaConfig,
         replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
 
         partitionsToMakeFollower.foreach { partition =>
-          stateChangeLogger.trace(s"Broker $localBrokerId started fetcher to new leader as part of become-follower " +
+          stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " +
             s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " +
             s"partition ${partition.topicPartition}")
         }
       }
     } catch {
       case e: Throwable =>
-        stateChangeLogger.error(s"Error on broker $localBrokerId while processing LeaderAndIsr request with " +
-          s"correlationId $correlationId received from controller $controllerId epoch $epoch", e)
+        stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
+          s"received from controller $controllerId epoch $epoch", e)
         // Re-throw the exception for it to be caught in KafkaApis
         throw e
     }
 
     partitionState.keys.foreach { partition =>
-      stateChangeLogger.trace(s"Broker $localBrokerId completed LeaderAndIsr request correlationId $correlationId from " +
-        s"controller $controllerId epoch $epoch for the become-follower transition for " +
-        s"partition ${partition.topicPartition}")
+      stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
+        s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition}")
     }
 
     partitionsToMakeFollower

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index f2cd4e9..c2585ad 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -28,8 +28,8 @@ trait Logging {
   // Force initialization to register Log4jControllerMBean
   private val log4jController = Log4jController
 
-  private def msgWithLogIdent(msg: String) = 
-    if(logIdent == null) msg else logIdent + msg
+  protected def msgWithLogIdent(msg: String) =
+    if (logIdent == null) msg else logIdent + msg
 
   def trace(msg: => String): Unit = {
     if (logger.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae3eeb3e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 0118a39..f608b95 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.requests._
 import org.junit.Assert._
 import kafka.utils.TestUtils
 import kafka.cluster.Broker
-import kafka.controller.{ControllerChannelManager, ControllerContext}
+import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.metrics.Metrics
@@ -138,7 +138,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val controllerContext = new ControllerContext(zkUtils)
     controllerContext.liveBrokers = brokers.toSet
     val metrics = new Metrics
-    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, metrics)
+    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM,
+      metrics, new StateChangeLogger(controllerId, inControllerContext = true, None))
     controllerChannelManager.startup()
     try {
       val staleControllerEpoch = 0


Mime
View raw message