kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1377172 - in /incubator/kafka/trunk/core/src/main/scala/kafka: Kafka.scala producer/async/DefaultEventHandler.scala utils/Log4jController.scala utils/Logging.scala utils/Utils.scala
Date Sat, 25 Aug 2012 01:04:59 GMT
Author: junrao
Date: Sat Aug 25 01:04:59 2012
New Revision: 1377172

URL: http://svn.apache.org/viewvc?rev=1377172&view=rev
Log:
Expose JMX operation to set logger level dynamically; patched by Jun Rao; reviewed by Jay
Kreps; KAFKA-429

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Log4jController.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1377172&r1=1377171&r2=1377172&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Sat Aug 25 01:04:59 2012
@@ -19,14 +19,11 @@ package kafka
 
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
-import org.apache.log4j.jmx.LoggerDynamicMBean
+
 
 object Kafka extends Logging {
 
   def main(args: Array[String]): Unit = {
-    val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
-    import org.apache.log4j.Logger
-    Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
 
     if (args.length != 1) {
       println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1377172&r1=1377171&r2=1377172&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Sat Aug 25 01:04:59 2012
@@ -61,7 +61,7 @@ private[kafka] class DefaultEventHandler
           sent = true
         }
         catch {
-          case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining))
+          case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining),
e)
           if (attemptsRemaining == 0)
             throw e
         }

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Log4jController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Log4jController.scala?rev=1377172&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Log4jController.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Log4jController.scala Sat Aug 25
01:04:59 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+
+import org.apache.log4j.{Logger, Level, LogManager}
+import java.util
+
+
+object Log4jController {
+
+  private val controller = new Log4jController
+
+  Utils.registerMBean(controller, "kafka:type=kafka.Log4jController")
+
+}
+
+
+/**
+ * An MBean that allows the user to dynamically alter log4j levels at runtime.
+ * The companion object contains the singleton instance of this class and
+ * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
+ * of the companion object.
+ */
+private class Log4jController extends Log4jControllerMBean {
+
+  def getLoggers = {
+    val lst = new util.ArrayList[String]()
+    lst.add("root=" + existingLogger("root").getLevel.toString)
+    val loggers = LogManager.getCurrentLoggers
+    while (loggers.hasMoreElements) {
+      val logger = loggers.nextElement().asInstanceOf[Logger]
+      if (logger != null) {
+        val level =  if (logger != null) logger.getLevel else null
+        lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null"))
+      }
+    }
+    lst
+  }
+
+
+  private def newLogger(loggerName: String) =
+    if (loggerName == "root")
+      LogManager.getRootLogger
+    else LogManager.getLogger(loggerName)
+
+
+  private def existingLogger(loggerName: String) =
+    if (loggerName == "root")
+      LogManager.getRootLogger
+    else LogManager.exists(loggerName)
+
+
+  def getLogLevel(loggerName: String) = {
+    val log = existingLogger(loggerName)
+    if (log != null) {
+      val level = log.getLevel
+      if (level != null)
+        log.getLevel.toString
+      else "Null log level."
+    }
+    else "No such logger."
+  }
+
+
+  def setLogLevel(loggerName: String, level: String) = {
+    val log = newLogger(loggerName)
+    if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
+      log.setLevel(Level.toLevel(level.toUpperCase))
+      true
+    }
+    else false
+  }
+
+}
+
+
+private trait Log4jControllerMBean {
+  def getLoggers: java.util.List[String]
+  def getLogLevel(logger: String): String
+  def setLogLevel(logger: String, level: String): Boolean
+}
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala?rev=1377172&r1=1377171&r2=1377172&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala Sat Aug 25 01:04:59
2012
@@ -24,7 +24,10 @@ trait Logging {
   lazy val logger = Logger.getLogger(loggerName)
 
   protected var logIdent = ""
-  
+
+  // Force initialization to register Log4jControllerMBean
+  private val log4jController = Log4jController
+
   private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
 
   def trace(msg: => String): Unit = {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1377172&r1=1377171&r2=1377172&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Sat Aug 25 01:04:59
2012
@@ -409,7 +409,7 @@ object Utils extends Logging {
    * instead it just returns false indicating the registration failed.
    * @param mbean The object to register as an mbean
    * @param name The name to register this mbean with
-   * @returns true if the registration succeeded
+   * @return true if the registration succeeded
    */
   def registerMBean(mbean: Object, name: String): Boolean = {
     try {
@@ -445,7 +445,7 @@ object Utils extends Logging {
   /**
    * Read an unsigned integer from the current position in the buffer, 
    * incrementing the position by 4 bytes
-   * @param The buffer to read from
+   * @param buffer The buffer to read from
    * @return The integer read, as a long to avoid signedness
    */
   def getUnsignedInt(buffer: ByteBuffer): Long = 



Mime
View raw message