kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1350 Fix excessive state change logging; reviewed by Jun,Joel,Guozhang and Timothy
Date Mon, 31 Mar 2014 22:29:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 5a6a1d83b -> dd08538a4


KAFKA-1350 Fix excessive state change logging;reviewed by Jun,Joel,Guozhang and Timothy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd08538a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd08538a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd08538a

Branch: refs/heads/0.8.1
Commit: dd08538a4f942695852949cc1e455c38f148486c
Parents: 5a6a1d8
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Mon Mar 31 15:28:52 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Mar 31 15:28:52 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala     |  2 +-
 .../kafka/controller/ControllerChannelManager.scala   |  4 ++--
 .../main/scala/kafka/controller/KafkaController.scala |  6 ++++--
 .../kafka/controller/PartitionStateMachine.scala      |  2 +-
 .../scala/kafka/controller/ReplicaStateMachine.scala  |  2 +-
 .../src/main/scala/kafka/network/RequestChannel.scala |  9 ++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala      | 14 ++++++--------
 core/src/main/scala/kafka/server/ReplicaManager.scala |  2 +-
 8 files changed, 22 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1087a2e..810952e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -56,7 +56,7 @@ class Partition(val topic: String,
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
-  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  private val stateChangeLogger = KafkaController.stateChangeLogger
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 8ab8ab6..f17d976 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -114,7 +114,7 @@ class RequestSendThread(val controllerId: Int,
                         val channel: BlockingChannel)
   extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId,
toBroker.id)) {
   private val lock = new Object()
-  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  private val stateChangeLogger = KafkaController.stateChangeLogger
   connectToBroker(toBroker, channel)
 
   override def doWork(): Unit = {
@@ -188,7 +188,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]]
   val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition,
PartitionStateInfo]]
-  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  private val stateChangeLogger = KafkaController.stateChangeLogger
 
   def newBatch() {
     // raise error if the previous batch is not empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2867ef1..d142f8c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -125,10 +125,12 @@ trait KafkaControllerMBean {
 
 object KafkaController extends Logging {
   val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
-  val stateChangeLogger = "state.change.logger"
+  val stateChangeLogger = new StateChangeLogger("state.change.logger")
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
 
+  case class StateChangeLogger(override val loggerName: String) extends Logging
+
   def parseControllerId(controllerInfoString: String): Int = {
     try {
       Json.parseFull(controllerInfoString) match {
@@ -154,7 +156,7 @@ object KafkaController extends Logging {
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with
KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
-  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  private val stateChangeLogger = KafkaController.stateChangeLogger
   val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c3e8d05..6457b56 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -50,7 +50,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
   private val hasStarted = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
-  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  private val stateChangeLogger = KafkaController.stateChangeLogger
   private var topicChangeListener: TopicChangeListener = null
   private var deleteTopicsListener: DeleteTopicsListener = null
   private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5e016d5..4da43c4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -52,7 +52,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId +
"]: "
-  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  private val stateChangeLogger = KafkaController.stateChangeLogger
 
   /**
    * Invoked on successful controller election. First registers a broker change listener
since that triggers all

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index a6ec970..826831f 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -31,6 +31,9 @@ import org.apache.log4j.Logger
 
 object RequestChannel extends Logging {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
+  val requestLogger = new RequestLogger("kafka.request.logger")
+
+  case class RequestLogger(override val loggerName: String) extends Logging
 
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition,
ByteBufferMessageSet]())
@@ -49,7 +52,7 @@ object RequestChannel extends Logging {
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer = null
-    private val requestLogger = Logger.getLogger("kafka.request.logger")
+    private val requestLogger = RequestChannel.requestLogger
     trace("Processor %d received request : %s".format(processor, requestObj))
 
     def updateRequestMetrics() {
@@ -81,10 +84,10 @@ object RequestChannel extends Logging {
              m.responseSendTimeHist.update(responseSendTime)
              m.totalTimeHist.update(totalTime)
       }
-      if(requestLogger.isTraceEnabled)
+      if(requestLogger.logger.isTraceEnabled)
         requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
           .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime,
apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
-      else if(requestLogger.isDebugEnabled) {
+      else {
         requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
           .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime,
apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ae2df20..0f137c5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -138,10 +138,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
       updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
         metadataCache.put(partitionState._1, partitionState._2)
-        if(stateChangeLogger.isTraceEnabled)
-          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response
to UpdateMetadata request " +
-            "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2,
partitionState._1,
-            updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response
to UpdateMetadata request " +
+          "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2,
partitionState._1,
+          updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
       }
       // remove the topics that don't exist in the UpdateMetadata request since those are
the topics that are
       // currently being deleted by the controller
@@ -155,10 +154,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       }.keySet
       partitionsToBeDeleted.foreach { partition =>
         metadataCache.remove(partition)
-        if(stateChangeLogger.isTraceEnabled)
-          stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in
response to UpdateMetadata request " +
-            "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
-            updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response
to UpdateMetadata request " +
+          "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
+          updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
       }
     }
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd08538a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0fe881d..7df56ce 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -56,7 +56,7 @@ class ReplicaManager(val config: KafkaConfig,
   val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath,
new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
-  val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  val stateChangeLogger = KafkaController.stateChangeLogger
 
   newGauge(
     "LeaderCount",


Mime
View raw message