kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1205311 [2/3] - in /incubator/kafka/trunk/core/src: main/scala/kafka/ main/scala/kafka/consumer/ main/scala/kafka/consumer/storage/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/message/ main/scala/kafka/log/ main/scala/kafka/message/...
Date Wed, 23 Nov 2011 07:21:23 GMT
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala Wed Nov 23 07:21:16 2011
@@ -20,12 +20,11 @@ package kafka.producer
 import async._
 import java.util.Properties
 import kafka.serializer.Encoder
-import org.apache.log4j.Logger
 import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
 import kafka.cluster.{Partition, Broker}
 import kafka.api.ProducerRequest
 import kafka.common.{UnavailableProducerException, InvalidConfigException}
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
 
 class ProducerPool[V](private val config: ProducerConfig,
@@ -33,9 +32,8 @@ class ProducerPool[V](private val config
                       private val syncProducers: ConcurrentMap[Int, SyncProducer],
                       private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]],
                       private val inputEventHandler: EventHandler[V] = null,
-                      private val cbkHandler: CallbackHandler[V] = null) {
+                      private val cbkHandler: CallbackHandler[V] = null) extends Logging {
 
-  private val logger = Logger.getLogger(classOf[ProducerPool[V]])
   private var eventHandler = inputEventHandler
   if(eventHandler == null)
     eventHandler = new DefaultEventHandler(config, cbkHandler)
@@ -76,7 +74,7 @@ class ProducerPool[V](private val config
     props.putAll(config.props)
     if(sync) {
         val producer = new SyncProducer(new SyncProducerConfig(props))
-        logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+        info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
         syncProducers.put(broker.id, producer)
     } else {
         val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
@@ -85,7 +83,7 @@ class ProducerPool[V](private val config
                                             eventHandler, config.eventHandlerProps,
                                             cbkHandler, config.cbkHandlerProps)
         producer.start
-        logger.info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
+        info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
         asyncProducers.put(broker.id, producer)
     }
   }
@@ -107,7 +105,7 @@ class ProducerPool[V](private val config
         val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId,
           new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
                                    messages = req.getData.map(d => serializer.toMessage(d)): _*)))
-        logger.debug("Fetching sync producer for broker id: " + bid)
+        debug("Fetching sync producer for broker id: " + bid)
         val producer = syncProducers.get(bid)
         if(producer != null) {
           if(producerRequests.size > 1)
@@ -117,14 +115,14 @@ class ProducerPool[V](private val config
                           partition = producerRequests(0).partition,
                           messages = producerRequests(0).messages)
           config.compressionCodec match {
-            case NoCompressionCodec => logger.debug("Sending message to broker " + bid)
-            case _ => logger.debug("Sending compressed messages to broker " + bid)
+            case NoCompressionCodec => debug("Sending message to broker " + bid)
+            case _ => debug("Sending compressed messages to broker " + bid)
           }
         }else
           throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
             "Sync Producer for broker " + bid + " does not exist in the pool")
       }else {
-        logger.debug("Fetching async producer for broker id: " + bid)
+        debug("Fetching async producer for broker id: " + bid)
         val producer = asyncProducers.get(bid)
         if(producer != null) {
           requestsForThisBid._1.foreach { req =>
@@ -132,8 +130,8 @@ class ProducerPool[V](private val config
           }
           if(logger.isDebugEnabled)
             config.compressionCodec match {
-              case NoCompressionCodec => logger.debug("Sending message")
-              case _ => logger.debug("Sending compressed messages")
+              case NoCompressionCodec => debug("Sending message")
+              case _ => debug("Sending compressed messages")
             }
         }
         else
@@ -149,12 +147,12 @@ class ProducerPool[V](private val config
   def close() = {
     config.producerType match {
       case "sync" =>
-        logger.info("Closing all sync producers")
+        info("Closing all sync producers")
         val iter = syncProducers.values.iterator
         while(iter.hasNext)
           iter.next.close
       case "async" =>
-        logger.info("Closing all async producers")
+        info("Closing all async producers")
         val iter = asyncProducers.values.iterator
         while(iter.hasNext)
           iter.next.close

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Nov 23 07:21:16 2011
@@ -24,7 +24,6 @@ import kafka.network._
 import kafka.utils._
 import kafka.api._
 import scala.math._
-import org.apache.log4j.{Level, Logger}
 import kafka.common.MessageSizeTooLargeException
 import java.nio.ByteBuffer
 
@@ -36,9 +35,8 @@ object SyncProducer {
  * Send a message set.
  */
 @threadsafe
-class SyncProducer(val config: SyncProducerConfig) {
+class SyncProducer(val config: SyncProducerConfig) extends Logging {
   
-  private val logger = Logger.getLogger(getClass())
   private val MaxConnectBackoffMs = 60000
   private var channel : SocketChannel = null
   private var sentOnConnection = 0
@@ -46,11 +44,11 @@ class SyncProducer(val config: SyncProdu
   @volatile
   private var shutdown: Boolean = false
 
-  logger.debug("Instantiating Scala Sync Producer")
+  debug("Instantiating Scala Sync Producer")
 
   private def verifySendBuffer(buffer : ByteBuffer) = {
     if (logger.isTraceEnabled) {
-      logger.trace("verifying sendbuffer of size " + buffer.limit)
+      trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
       if (requestTypeId == RequestKeys.MultiProduce) {
         try {
@@ -59,17 +57,17 @@ class SyncProducer(val config: SyncProdu
             try {
               for (messageAndOffset <- produce.messages)
                 if (!messageAndOffset.message.isValid)
-                  logger.trace("topic " + produce.topic + " is invalid")
+                  trace("topic " + produce.topic + " is invalid")
             }
             catch {
               case e: Throwable =>
-              logger.trace("error iterating messages " + e + Utils.stackTrace(e))
+              trace("error iterating messages " + e + Utils.stackTrace(e))
             }
           }
         }
         catch {
           case e: Throwable =>
-            logger.trace("error verifying sendbuffer " + e + Utils.stackTrace(e))
+            trace("error verifying sendbuffer " + e + Utils.stackTrace(e))
         }
       }
     }
@@ -112,8 +110,7 @@ class SyncProducer(val config: SyncProdu
   def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
     verifyMessageSize(messages)
     val setSize = messages.sizeInBytes.asInstanceOf[Int]
-    if(logger.isTraceEnabled)
-      logger.trace("Got message set with " + setSize + " bytes to send")
+    trace("Got message set with " + setSize + " bytes to send")
     send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
   }
  
@@ -123,8 +120,7 @@ class SyncProducer(val config: SyncProdu
     for (request <- produces)
       verifyMessageSize(request.messages)
     val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes)
-    if(logger.isTraceEnabled)
-      logger.trace("Got multi message sets with " + setSize + " bytes to send")
+    trace("Got multi message sets with " + setSize + " bytes to send")
     send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
   }
 
@@ -148,13 +144,13 @@ class SyncProducer(val config: SyncProdu
   private def disconnect() {
     try {
       if(channel != null) {
-        logger.info("Disconnecting from " + config.host + ":" + config.port)
+        info("Disconnecting from " + config.host + ":" + config.port)
         Utils.swallow(logger.warn, channel.close())
         Utils.swallow(logger.warn, channel.socket.close())
         channel = null
       }
     } catch {
-      case e: Exception => logger.error("Error on disconnect: ", e)
+      case e: Exception => error("Error on disconnect: ", e)
     }
   }
     
@@ -170,7 +166,7 @@ class SyncProducer(val config: SyncProdu
         channel.socket.setSoTimeout(config.socketTimeoutMs)
         channel.socket.setKeepAlive(true)
         channel.connect(new InetSocketAddress(config.host, config.port))
-        logger.info("Connected to " + config.host + ":" + config.port + " for producing")
+        info("Connected to " + config.host + ":" + config.port + " for producing")
       }
       catch {
         case e: Exception => {
@@ -178,10 +174,10 @@ class SyncProducer(val config: SyncProdu
           val endTimeMs = SystemTime.milliseconds
           if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs)
           {
-            logger.error("Producer connection timing out after " + config.connectTimeoutMs + " ms", e)
+            error("Producer connection timing out after " + config.connectTimeoutMs + " ms", e)
             throw e
           }
-          logger.error("Connection attempt failed, next attempt in " + connectBackoffMs + " ms", e)
+          error("Connection attempt failed, next attempt in " + connectBackoffMs + " ms", e)
           SystemTime.sleep(connectBackoffMs)
           connectBackoffMs = min(10 * connectBackoffMs, MaxConnectBackoffMs)
         }
@@ -219,8 +215,7 @@ class SyncProducerStats extends SyncProd
   def getNumProduceRequests: Long = produceRequestStats.getNumRequests
 }
 
-object SyncProducerStats {
-  private val logger = Logger.getLogger(getClass())
+object SyncProducerStats extends Logging {
   private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
   private val stats = new SyncProducerStats
   Utils.swallow(logger.warn, Utils.registerMBean(stats, kafkaProducerstatsMBeanName))

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala Wed Nov 23 07:21:16 2011
@@ -19,7 +19,7 @@ package kafka.producer
 import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
 import collection.mutable.HashMap
 import collection.mutable.Map
-import org.apache.log4j.Logger
+import kafka.utils.Logging
 import collection.immutable.TreeSet
 import kafka.cluster.{Broker, Partition}
 import org.apache.zookeeper.Watcher.Event.KeeperState
@@ -57,8 +57,7 @@ private[producer] object ZKBrokerPartiti
  * If zookeeper based auto partition discovery is enabled, fetch broker info like
  * host, port, number of partitions from zookeeper
  */
-private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (Int, String, Int) => Unit) extends BrokerPartitionInfo {
-  private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo])
+private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (Int, String, Int) => Unit) extends BrokerPartitionInfo with Logging {
   private val zkWatcherLock = new Object
   private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
     ZKStringSerializer)
@@ -75,7 +74,7 @@ private[producer] class ZKBrokerPartitio
   // register listener for change of brokers for each topic to keep topicsBrokerPartitions updated
   topicBrokerPartitions.keySet.foreach {topic =>
     zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, brokerTopicsListener)
-    logger.debug("Registering listener on path: " + ZkUtils.BrokerTopicsPath + "/" + topic)
+    debug("Registering listener on path: " + ZkUtils.BrokerTopicsPath + "/" + topic)
   }
 
   // register listener for new broker
@@ -138,20 +137,17 @@ private[producer] class ZKBrokerPartitio
   }
 
   private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = {
-    if(logger.isDebugEnabled) logger.debug("Currently, no brokers are registered under topic: " + topic)
-    if(logger.isDebugEnabled)
-      logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
+   debug("Currently, no brokers are registered under topic: " + topic)
+    debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
       "number of partitions = 1")
     val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath)
-    if(logger.isTraceEnabled)
-      logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
+    trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
     // since we do not have the in formation about number of partitions on these brokers, just assume single partition
     // i.e. pick partition 0 from each broker as a candidate
     val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0))
     // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers
     // participate in hosting this topic.
-    if(logger.isDebugEnabled)
-      logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
+    debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
     numBrokerPartitions
   }
 
@@ -171,8 +167,7 @@ private[producer] class ZKBrokerPartitio
       val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
       val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions)
       val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
-      if(logger.isDebugEnabled)
-        logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
+      debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
 
       var brokerParts = SortedSet.empty[Partition]
       sortedBrokerPartitions.foreach { bp =>
@@ -182,8 +177,7 @@ private[producer] class ZKBrokerPartitio
         }
       }
       brokerPartitionsPerTopic += (topic -> brokerParts)
-      if(logger.isDebugEnabled)
-        logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString)
+      debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString)
     }
     brokerPartitionsPerTopic
   }
@@ -208,17 +202,14 @@ private[producer] class ZKBrokerPartitio
    * keeps the related data structures updated
    */
   class BrokerTopicsListener(val originalBrokerTopicsPartitionsMap: collection.mutable.Map[String, SortedSet[Partition]],
-                             val originalBrokerIdMap: Map[Int, Broker]) extends IZkChildListener {
+                             val originalBrokerIdMap: Map[Int, Broker]) extends IZkChildListener with Logging {
     private var oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++
                                               originalBrokerTopicsPartitionsMap
     private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap
-    private val logger = Logger.getLogger(classOf[BrokerTopicsListener])
 
-    if(logger.isDebugEnabled)
-      logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" +
+    debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" +
       "/broker/topics, /broker/topics/topic, /broker/ids")
-    if(logger.isDebugEnabled)
-      logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " +
+    debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " +
       "partition id per topic with " + oldBrokerTopicPartitionsMap.toString)
 
     @throws(classOf[Exception])
@@ -227,22 +218,18 @@ private[producer] class ZKBrokerPartitio
                                               else new java.util.ArrayList[String]()
 
       zkWatcherLock synchronized {
-        if(logger.isTraceEnabled)
-          logger.trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString)
+        trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString)
         import scala.collection.JavaConversions._
 
         parentPath match {
           case "/brokers/topics" =>        // this is a watcher for /broker/topics path
             val updatedTopics = asBuffer(curChilds)
-            if(logger.isDebugEnabled) {
-              logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
+            debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
                 curChilds.toString)
-              logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
-              logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString)
-            }
+            debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
+            debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString)
             val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet
-            if(logger.isDebugEnabled)
-              logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString)
+            debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString)
             newTopics.foreach { topic =>
               val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
               val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
@@ -251,16 +238,14 @@ private[producer] class ZKBrokerPartitio
                 brokerTopicsListener)
             }
           case "/brokers/ids"    =>        // this is a watcher for /broker/ids path
-            if(logger.isDebugEnabled)
-              logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
+            debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
                 "\t Currently registered list of brokers -> " + curChilds.toString)
             processBrokerChange(parentPath, curChilds)
           case _ =>
             val pathSplits = parentPath.split("/")
             val topic = pathSplits.last
             if(pathSplits.length == 4 && pathSplits(2).equals("topics")) {
-              if(logger.isDebugEnabled)
-                logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
+              debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
                   " list of brokers -> " + curChilds.toString + " for topic -> " + topic)
               processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
             }
@@ -277,18 +262,17 @@ private[producer] class ZKBrokerPartitio
         import scala.collection.JavaConversions._
         val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt)
         val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet
-        if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
+        debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
         newBrokers.foreach { bid =>
           val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
           val brokerHostPort = brokerInfo.split(":")
           allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt))
-          if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid)
+          debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid)
           producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt)
         }
         // remove dead brokers from the in memory list of live brokers
         val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet
-        if(logger.isDebugEnabled)
-          logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)
+        debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)
         deadBrokers.foreach {bid =>
           allBrokers = allBrokers - bid
           // also remove this dead broker from particular topics
@@ -297,8 +281,7 @@ private[producer] class ZKBrokerPartitio
               case Some(oldBrokerPartitionList) =>
                 val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid)
                 topicBrokerPartitions += (topic -> aliveBrokerPartitionList)
-                if(logger.isDebugEnabled)
-                  logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " +
+                debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " +
                   "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString)
               case None =>
             }
@@ -317,23 +300,20 @@ private[producer] class ZKBrokerPartitio
       // find the old list of brokers for this topic
       oldBrokerTopicPartitionsMap.get(topic) match {
         case Some(brokersParts) =>
-          if(logger.isDebugEnabled)
-            logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString)
+          debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString)
         case None =>
       }
 
       val updatedBrokerList = curChilds.map(b => b.toInt)
       import ZKBrokerPartitionInfo._
       val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList)
-      if(logger.isDebugEnabled)
-        logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " +
+      debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " +
           curChilds.toString)
       // update the number of partitions on existing brokers
       var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts
       topicBrokerPartitions.get(topic) match {
         case Some(oldBrokerParts) =>
-          if(logger.isDebugEnabled)
-            logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " +
+          debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " +
             oldBrokerParts.toString)
           mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts
         case None =>
@@ -341,24 +321,19 @@ private[producer] class ZKBrokerPartitio
       // keep only brokers that are alive
       mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId))
       topicBrokerPartitions += (topic -> mergedBrokerParts)
-      if(logger.isDebugEnabled)
-        logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " +
+      debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " +
           mergedBrokerParts.toString)
     }
 
     def resetState = {
-      if(logger.isTraceEnabled)
-        logger.trace("[BrokerTopicsListener] Before reseting broker topic partitions state " +
+      trace("[BrokerTopicsListener] Before reseting broker topic partitions state " +
           oldBrokerTopicPartitionsMap.toString)
       oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions
-      if(logger.isDebugEnabled)
-        logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " +
+      debug("[BrokerTopicsListener] After reseting broker topic partitions state " +
           oldBrokerTopicPartitionsMap.toString)
-      if(logger.isTraceEnabled)
-        logger.trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString)
+      trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString)
       oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++  allBrokers
-      if(logger.isDebugEnabled)
-        logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString)
+      debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString)
     }
   }
 
@@ -386,7 +361,7 @@ private[producer] class ZKBrokerPartitio
        *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
        *  connection for us.
        */
-      logger.info("ZK expired; release old list of broker partitions for topics ")
+      info("ZK expired; release old list of broker partitions for topics ")
       topicBrokerPartitions = getZKTopicPartitionInfo
       allBrokers = getZKBrokerInfo
       brokerTopicsListener.resetState

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Wed Nov 23 07:21:16 2011
@@ -18,9 +18,8 @@
 package kafka.producer.async
 
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.log4j.{Level, Logger}
 import kafka.api.ProducerRequest
 import kafka.serializer.Encoder
 import java.util.{Random, Properties}
@@ -39,8 +38,7 @@ private[kafka] class AsyncProducer[T](co
                                       eventHandler: EventHandler[T] = null,
                                       eventHandlerProps: Properties = null,
                                       cbkHandler: CallbackHandler[T] = null,
-                                      cbkHandlerProps: Properties = null) {
-  private val logger = Logger.getLogger(classOf[AsyncProducer[T]])
+                                      cbkHandlerProps: Properties = null) extends Logging {
   private val closed = new AtomicBoolean(false)
   private val queue = new LinkedBlockingQueue[QueueItem[T]](config.queueSize)
   // initialize the callback handlers
@@ -98,7 +96,7 @@ private[kafka] class AsyncProducer[T](co
           case e: InterruptedException =>
             val msg = "%s interrupted during enqueue of event %s.".format(
               getClass.getSimpleName, event.toString)
-            logger.error(msg)
+            error(msg)
             throw new AsyncProducerInterruptedException(msg)
         }
     }
@@ -134,6 +132,7 @@ private[kafka] class AsyncProducer[T](co
   }
 
   // for testing only
+  import org.apache.log4j.Level
   def setLoggerLevel(level: Level) = logger.setLevel(level)
 }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala Wed Nov 23 07:21:16 2011
@@ -42,7 +42,7 @@ class AsyncProducerQueueSizeStats[T](pri
 object AsyncProducerStats {
   private val logger = Logger.getLogger(getClass())
   private val stats = new AsyncProducerStats
-  Utils.swallow(logger.warn, Utils.registerMBean(stats, AsyncProducer.ProducerMBeanName))
+  Utils.registerMBean(stats, AsyncProducer.ProducerMBeanName)
 
   def recordDroppedEvents = stats.recordDroppedEvents
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Wed Nov 23 07:21:16 2011
@@ -19,18 +19,16 @@ package kafka.producer.async
 
 import collection.mutable.HashMap
 import collection.mutable.Map
-import org.apache.log4j.Logger
 import kafka.api.ProducerRequest
 import kafka.serializer.Encoder
 import java.util.Properties
+import kafka.utils.Logging
 import kafka.producer.{ProducerConfig, SyncProducer}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
 
 
 private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
-                                            val cbkHandler: CallbackHandler[T]) extends EventHandler[T] {
-
-  private val logger = Logger.getLogger(classOf[DefaultEventHandler[T]])
+                                            val cbkHandler: CallbackHandler[T]) extends EventHandler[T] with Logging {
 
   override def init(props: Properties) { }
 
@@ -40,7 +38,7 @@ private[kafka] class DefaultEventHandler
       processedEvents = cbkHandler.beforeSendingData(events)
 
     if(logger.isTraceEnabled)
-      processedEvents.foreach(event => logger.trace("Handling event for Topic: %s, Partition: %d"
+      processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d"
         .format(event.getTopic, event.getPartition)))
 
     send(serialize(collate(processedEvents), serializer), syncProducer)
@@ -50,8 +48,7 @@ private[kafka] class DefaultEventHandler
     if(messagesPerTopic.size > 0) {
       val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
       syncProducer.multiSend(requests)
-      if(logger.isTraceEnabled)
-        logger.trace("kafka producer sent messages for topics " + messagesPerTopic)
+      trace("kafka producer sent messages for topics " + messagesPerTopic)
     }
   }
 
@@ -69,27 +66,23 @@ private[kafka] class DefaultEventHandler
       ((topicAndEvents._1._1, topicAndEvents._1._2),
         config.compressionCodec match {
           case NoCompressionCodec =>
-            if(logger.isTraceEnabled)
-              logger.trace("Sending %d messages with no compression to topic %s on partition %d"
+            trace("Sending %d messages with no compression to topic %s on partition %d"
                 .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2))
             new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
           case _ =>
             config.compressedTopics.size match {
               case 0 =>
-                if(logger.isTraceEnabled)
-                  logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+                trace("Sending %d messages with compression codec %d to topic %s on partition %d"
                     .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
                 new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
               case _ =>
                 if(config.compressedTopics.contains(topicAndEvents._1._1)) {
-                  if(logger.isTraceEnabled)
-                    logger.trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+                  trace("Sending %d messages with compression codec %d to topic %s on partition %d"
                       .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2))
                   new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
                 }
                 else {
-                  if(logger.isTraceEnabled)
-                    logger.trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
+                  trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
                       .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, topicAndEvents._1._1,
                       config.compressedTopics.toString))
                   new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Nov 23 07:21:16 2011
@@ -17,9 +17,8 @@
 
 package kafka.producer.async
 
-import kafka.utils.SystemTime
+import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
-import org.apache.log4j.Logger
 import collection.mutable.ListBuffer
 import kafka.serializer.Encoder
 import kafka.producer.SyncProducer
@@ -32,25 +31,23 @@ private[async] class ProducerSendThread[
                                            val cbkHandler: CallbackHandler[T],
                                            val queueTime: Long,
                                            val batchSize: Int,
-                                           val shutdownCommand: Any) extends Thread(threadName) {
+                                           val shutdownCommand: Any) extends Thread(threadName) with Logging {
 
-  private val logger = Logger.getLogger(classOf[ProducerSendThread[T]])
   private val shutdownLatch = new CountDownLatch(1)
 
   override def run {
 
     try {
       val remainingEvents = processEvents
-      if(logger.isDebugEnabled) logger.debug("Remaining events = " + remainingEvents.size)
+      debug("Remaining events = " + remainingEvents.size)
 
       // handle remaining events
       if(remainingEvents.size > 0) {
-        if(logger.isDebugEnabled)
-           logger.debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size))
+        debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size))
         tryToHandle(remainingEvents)
       }
     }catch {
-      case e: Exception => logger.error("Error in sending events: ", e)
+      case e: Exception => error("Error in sending events: ", e)
     }finally {
       shutdownLatch.countDown
     }
@@ -60,7 +57,7 @@ private[async] class ProducerSendThread[
 
   def shutdown = {
     handler.close
-    logger.info("Shutdown thread complete")
+    info("Shutdown thread complete")
   }
 
   private def processEvents(): Seq[QueueItem[T]] = {
@@ -77,8 +74,7 @@ private[async] class ProducerSendThread[
         // returns a null object
         val expired = currentQueueItem == null
         if(currentQueueItem != null) {
-          if(logger.isTraceEnabled)
-            logger.trace("Dequeued item for topic %s and partition %d"
+          trace("Dequeued item for topic %s and partition %d"
               .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
           // handle the dequeued current item
           if(cbkHandler != null)
@@ -90,10 +86,8 @@ private[async] class ProducerSendThread[
           full = events.size >= batchSize
         }
         if(full || expired) {
-          if(logger.isDebugEnabled) {
-            if(expired) logger.debug(elapsed + " ms elapsed. Queue time reached. Sending..")
-            if(full) logger.debug("Batch full. Sending..")
-          }
+          if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
+          if(full) debug("Batch full. Sending..")
           // if either queue time has reached or batch size has reached, dispatch to event handler
           tryToHandle(events)
           lastSend = SystemTime.milliseconds
@@ -104,7 +98,7 @@ private[async] class ProducerSendThread[
       throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
         .format(queue.size))
     if(cbkHandler != null) {
-      logger.info("Invoking the callback handler before handling the last batch of %d events".format(events.size))
+      info("Invoking the callback handler before handling the last batch of %d events".format(events.size))
       val addedEvents = cbkHandler.lastBatchBeforeClose
       logEvents("last batch before close", addedEvents)
       events = events ++ addedEvents
@@ -114,19 +108,19 @@ private[async] class ProducerSendThread[
 
   def tryToHandle(events: Seq[QueueItem[T]]) {
     try {
-      if(logger.isDebugEnabled) logger.debug("Handling " + events.size + " events")
+      debug("Handling " + events.size + " events")
       if(events.size > 0)
         handler.handle(events, underlyingProducer, serializer)
     }catch {
-      case e: Exception => logger.error("Error in handling batch of " + events.size + " events", e)
+      case e: Exception => error("Error in handling batch of " + events.size + " events", e)
     }
   }
 
   private def logEvents(tag: String, events: Iterable[QueueItem[T]]) {
     if(logger.isTraceEnabled) {
-      logger.trace("events for " + tag + ":")
+      trace("events for " + tag + ":")
       for (event <- events)
-        logger.trace(event.getData.toString)
+        trace(event.getData.toString)
     }
   }
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Wed Nov 23 07:21:16 2011
@@ -24,14 +24,14 @@ import kafka.message._
 import kafka.api._
 import kafka.common.ErrorMapping
 import kafka.utils.SystemTime
+import kafka.utils.Logging
 import java.io.IOException
 
 /**
  * Logic to handle the various Kafka requests
  */
-private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {
+private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging {
   
-  private val logger = Logger.getLogger(classOf[KafkaRequestHandlers])
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
   def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = {
@@ -52,8 +52,7 @@ private[kafka] class KafkaRequestHandler
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Producer request " + request.toString)
     handleProducerRequest(request, "ProduceRequest")
-    if (logger.isDebugEnabled)
-      logger.debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
+    debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
     None
   }
 
@@ -69,15 +68,14 @@ private[kafka] class KafkaRequestHandler
     val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
     try {
       logManager.getOrCreateLog(request.topic, partition).append(request.messages)
-      if(logger.isTraceEnabled)
-        logger.trace(request.messages.sizeInBytes + " bytes written to logs.")
+      trace(request.messages.sizeInBytes + " bytes written to logs.")
     }
     catch {
       case e =>
-        logger.error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
+        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
         e match {
           case _: IOException =>
-            logger.fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
+            fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
             Runtime.getRuntime.halt(1)
           case _ =>
         }
@@ -107,7 +105,7 @@ private[kafka] class KafkaRequestHandler
   private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
     var  response: MessageSetSend = null
     try {
-      logger.trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition)
+      trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition)
       val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
       if (log != null)
         response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
@@ -116,7 +114,7 @@ private[kafka] class KafkaRequestHandler
     }
     catch {
       case e =>
-        logger.error("error when processing request " + fetchRequest, e)
+        error("error when processing request " + fetchRequest, e)
         response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }
     response

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Wed Nov 23 07:21:16 2011
@@ -18,11 +18,10 @@
 package kafka.server
 
 import scala.reflect.BeanProperty
-import org.apache.log4j.Logger
 import kafka.log.LogManager
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler}
+import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
 import kafka.network.{SocketServerStats, SocketServer}
 import java.io.File
 
@@ -30,11 +29,10 @@ import java.io.File
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig) {
+class KafkaServer(val config: KafkaConfig) extends Logging {
   val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown"
   private val isShuttingDown = new AtomicBoolean(false)
   
-  private val logger = Logger.getLogger(classOf[KafkaServer])
   private val shutdownLatch = new CountDownLatch(1)
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   
@@ -50,7 +48,7 @@ class KafkaServer(val config: KafkaConfi
    */
   def startup() {
     try {
-      logger.info("Starting Kafka server...")
+      info("Starting Kafka server...")
       var needRecovery = true
       val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
       if (cleanShutDownFile.exists) {
@@ -70,7 +68,7 @@ class KafkaServer(val config: KafkaConfi
                                       config.monitoringPeriodSecs,
                                       handlers.handlerFor,
                                       config.maxSocketRequestSize)
-      Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName))
+      Utils.registerMBean(socketServer.stats, statsMBeanName)
       socketServer.startup
       Mx4jLoader.maybeLoad
       /**
@@ -78,11 +76,11 @@ class KafkaServer(val config: KafkaConfi
        *  So this should happen after socket server start.
        */
       logManager.startup
-      logger.info("Server started.")
+      info("Server started.")
     }
     catch {
       case e =>
-        logger.fatal("Fatal error during startup.", e)
+        fatal("Fatal error during startup.", e)
         shutdown
     }
   }
@@ -94,12 +92,12 @@ class KafkaServer(val config: KafkaConfi
   def shutdown() {
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
-      logger.info("Shutting down...")
+      info("Shutting down...")
       try {
         scheduler.shutdown()
         if (socketServer != null)
           socketServer.shutdown()
-        Utils.swallow(logger.warn, Utils.unregisterMBean(statsMBeanName))
+        Utils.unregisterMBean(statsMBeanName)
         if (logManager != null)
           logManager.close()
 
@@ -108,11 +106,11 @@ class KafkaServer(val config: KafkaConfi
       }
       catch {
         case e =>
-          logger.fatal(e)
-          logger.fatal(Utils.stackTrace(e))
+          fatal(e)
+          fatal(Utils.stackTrace(e))
       }
       shutdownLatch.countDown()
-      logger.info("shut down completed")
+      info("shut down completed")
     }
   }
   

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala Wed Nov 23 07:21:16 2011
@@ -17,11 +17,10 @@
 
 package kafka.server
 
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import kafka.consumer._
 import kafka.producer.{ProducerData, ProducerConfig, Producer}
 import kafka.message.Message
-import org.apache.log4j.Logger
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.Map
@@ -62,9 +61,7 @@ class KafkaServerStartable(val serverCon
 
 class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
                        private val producerConfig: ProducerConfig,
-                       private val kafkaServer: KafkaServer) extends TopicEventHandler[String] {
-
-  private val logger = Logger.getLogger(getClass)
+                       private val kafkaServer: KafkaServer) extends TopicEventHandler[String] with Logging {
 
   private val whiteListTopics =
     consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim)
@@ -96,16 +93,16 @@ class EmbeddedConsumer(private val consu
 
     val addedTopics = newMirrorTopics filterNot (mirrorTopics contains)
     if (addedTopics.nonEmpty)
-      logger.info("topic event: added topics = %s".format(addedTopics))
+      info("topic event: added topics = %s".format(addedTopics))
 
     val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains)
     if (deletedTopics.nonEmpty)
-      logger.info("topic event: deleted topics = %s".format(deletedTopics))
+      info("topic event: deleted topics = %s".format(deletedTopics))
 
     mirrorTopics = newMirrorTopics
 
     if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
-      logger.info("mirror topics = %s".format(mirrorTopics))
+      info("mirror topics = %s".format(mirrorTopics))
       startNewConsumerThreads(makeTopicMap(mirrorTopics))
     }
   }
@@ -142,7 +139,7 @@ class EmbeddedConsumer(private val consu
       threadList.foreach(_.start)
     }
     else
-      logger.info("Not starting mirroring threads (mirror topic list is empty)")
+      info("Not starting mirroring threads (mirror topic list is empty)")
   }
 
   def startup() {
@@ -157,29 +154,28 @@ class EmbeddedConsumer(private val consu
     // first shutdown the topic watcher to prevent creating new consumer streams
     if (topicEventWatcher != null)
       topicEventWatcher.shutdown()
-    logger.info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers")
+    info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers")
     // stop pulling more data for mirroring
     if (consumerConnector != null)
       consumerConnector.shutdown()
-    logger.info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads")
+    info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads")
     // wait for all mirroring threads to stop
     threadList.foreach(_.shutdown)
-    logger.info("Stopped all existing mirroring threads, now stopping the producer")
+    info("Stopped all existing mirroring threads, now stopping the producer")
     // only then, shutdown the producer
     producer.close()
-    logger.info("Successfully shutdown this Kafka mirror")
+    info("Successfully shutdown this Kafka mirror")
   }
 
-  class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread {
+  class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread with Logging {
     val shutdownComplete = new CountDownLatch(1)
     val name = "kafka-embedded-consumer-%s-%d".format(topic, threadId)
     this.setDaemon(false)
     this.setName(name)
 
-    private val logger = Logger.getLogger(name)
 
     override def run = {
-      logger.info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
+      info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
 
       try {
         for (message <- stream) {
@@ -189,11 +185,11 @@ class EmbeddedConsumer(private val consu
       }
       catch {
         case e =>
-          logger.fatal(e + Utils.stackTrace(e))
-          logger.fatal(topic + " stream " + threadId + " unexpectedly exited")
+          fatal(e + Utils.stackTrace(e))
+          fatal(topic + " stream " + threadId + " unexpectedly exited")
       }finally {
         shutdownComplete.countDown
-        logger.info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
+        info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
       }
     }
 
@@ -201,7 +197,7 @@ class EmbeddedConsumer(private val consu
       try {
         shutdownComplete.await
       }catch {
-        case e: InterruptedException => logger.fatal("Shutdown of thread " + name + " interrupted. " +
+        case e: InterruptedException => fatal("Shutdown of thread " + name + " interrupted. " +
           "Mirroring thread might leak data!")
       }
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Wed Nov 23 07:21:16 2011
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.utils._
-import org.apache.log4j.Logger
 import kafka.cluster.Broker
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -32,9 +31,7 @@ import java.net.InetAddress
  *   /brokers/[0...N] --> host:port
  * 
  */
-class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) {
-  
-  private val logger = Logger.getLogger(classOf[KafkaZooKeeper])
+class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) extends Logging {
   
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   var zkClient: ZkClient = null
@@ -43,13 +40,13 @@ class KafkaZooKeeper(config: KafkaConfig
   
   def startup() {
     /* start client */
-    logger.info("connecting to ZK: " + config.zkConnect)
+    info("connecting to ZK: " + config.zkConnect)
     zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
     zkClient.subscribeStateChanges(new SessionExpireListener)
   }
 
   def registerBrokerInZk() {
-    logger.info("Registering broker " + brokerIdPath)
+    info("Registering broker " + brokerIdPath)
     val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
     val creatorId = hostName + "-" + System.currentTimeMillis
     val broker = new Broker(config.brokerId, creatorId, hostName, config.port)
@@ -62,7 +59,7 @@ class KafkaZooKeeper(config: KafkaConfig
                                    "else you have shutdown this broker and restarted it faster than the zookeeper " + 
                                    "timeout so it appears to be re-registering.")
     }
-    logger.info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+    info("Registering broker " + brokerIdPath + " succeeded with " + broker)
   }
 
   def registerTopicInZk(topic: String) {
@@ -75,9 +72,9 @@ class KafkaZooKeeper(config: KafkaConfig
   def registerTopicInZkInternal(topic: String) {
     val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + config.brokerId
     val numParts = logManager.getTopicPartitionsMap.getOrElse(topic, config.numPartitions)
-    logger.info("Begin registering broker topic " + brokerTopicPath + " with " + numParts.toString + " partitions")
+    info("Begin registering broker topic " + brokerTopicPath + " with " + numParts.toString + " partitions")
     ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerTopicPath, numParts.toString)
-    logger.info("End registering broker topic " + brokerTopicPath)
+    info("End registering broker topic " + brokerTopicPath)
   }
 
   /**
@@ -99,20 +96,20 @@ class KafkaZooKeeper(config: KafkaConfig
      */
     @throws(classOf[Exception])
     def handleNewSession() {
-      logger.info("re-registering broker info in ZK for broker " + config.brokerId)
+      info("re-registering broker info in ZK for broker " + config.brokerId)
       registerBrokerInZk()
       lock synchronized {
-        logger.info("re-registering broker topics in ZK for broker " + config.brokerId)
+        info("re-registering broker topics in ZK for broker " + config.brokerId)
         for (topic <- topics)
           registerTopicInZkInternal(topic)
       }
-      logger.info("done re-registering broker")
+      info("done re-registering broker")
     }
   }
 
   def close() {
     if (zkClient != null) {
-      logger.info("Closing zookeeper client...")
+      info("Closing zookeeper client...")
       zkClient.close()
     }
   } 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Wed Nov 23 07:21:16 2011
@@ -20,12 +20,10 @@ package kafka.tools
 
 import joptsimple._
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, ZKStringSerializer}
-import org.apache.log4j.Logger
+import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import kafka.consumer.SimpleConsumer
 import collection.mutable.Map
-object ConsumerOffsetChecker {
-  private val logger = Logger.getLogger(getClass)
+object ConsumerOffsetChecker extends Logging {
 
   private val consumerMap: Map[String, Option[SimpleConsumer]] = Map()
 
@@ -40,7 +38,7 @@ object ConsumerOffsetChecker {
       case BrokerIpPattern(ip, port) =>
         Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
       case _ =>
-        logger.error("Could not parse broker info %s".format(brokerInfo))
+        error("Could not parse broker info %s".format(brokerInfo))
         None
     }
     consumer
@@ -75,7 +73,7 @@ object ConsumerOffsetChecker {
           case None => // ignore
         }
       case _ =>
-        logger.error("Could not parse broker/partition pair %s".format(bidPid))
+        error("Could not parse broker/partition pair %s".format(bidPid))
     }
   }
 
@@ -139,7 +137,7 @@ object ConsumerOffsetChecker {
           zkClient, "/consumers/%s/offsets".format(group)).toList
       }
 
-      logger.debug("zkConnect = %s; topics = %s; group = %s".format(
+      debug("zkConnect = %s; topics = %s; group = %s".format(
         zkConnect, topicList.toString(), group))
 
       topicList.foreach {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala Wed Nov 23 07:21:16 2011
@@ -22,8 +22,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
 import joptsimple._
-import org.apache.log4j.Logger
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import kafka.consumer.{ConsumerConfig, ConsumerConnector, Consumer}
 
 abstract class ShutdownableThread(name: String) extends Thread(name) {
@@ -33,8 +32,7 @@ abstract class ShutdownableThread(name: 
 /**
  * Performance test for the full zookeeper consumer
  */
-object ConsumerPerformance {
-  private val logger = Logger.getLogger(getClass())
+object ConsumerPerformance extends Logging {
 
   def main(args: Array[String]): Unit = {
     
@@ -127,7 +125,7 @@ object ConsumerPerformance {
           }
 
           private def printMessage(totalBytesRead: Long, nMessages: Long, elapsedSecs: Double) = {
-            logger.info("thread[" + i + "], nMsgs:" + nMessages + " bytes:" + totalBytesRead +
+            info("thread[" + i + "], nMsgs:" + nMessages + " bytes:" + totalBytesRead +
               " nMsgs/sec:" + (nMessages / elapsedSecs).formatted("%.2f") +
               " MB/sec:" + (totalBytesRead / elapsedSecs / (1024.0*1024.0)).formatted("%.2f"))
 
@@ -135,9 +133,9 @@ object ConsumerPerformance {
           private def shutdownComplete() = shutdownLatch.countDown
         }
 
-    logger.info("Sleeping for " + initialSleep / 1000 + " seconds.")
+    info("Sleeping for " + initialSleep / 1000 + " seconds.")
     Thread.sleep(initialSleep)
-    logger.info("starting threads")
+    info("starting threads")
     for (thread <- threadList)
       thread.start
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala Wed Nov 23 07:21:16 2011
@@ -18,9 +18,8 @@
 package kafka.tools
 
 import joptsimple._
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import java.util.concurrent.CountDownLatch
-import org.apache.log4j.Logger
 import kafka.consumer._
 import kafka.serializer.StringDecoder
 
@@ -28,7 +27,6 @@ import kafka.serializer.StringDecoder
  * Program to read using the rich consumer and dump the results to standard out
  */
 object ConsumerShell {
-  val logger = Logger.getLogger(getClass)
   def main(args: Array[String]): Unit = {
     
     val parser = new OptionParser
@@ -84,9 +82,8 @@ object ConsumerShell {
   }
 }
 
-class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread {
+class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread with Logging {
   val shutdownLatch = new CountDownLatch(1)
-  val logger = Logger.getLogger(getClass)
 
   override def run() {
     println("Starting consumer thread..")
@@ -98,7 +95,7 @@ class ZKConsumerThread(stream: KafkaMess
       }
     }catch {
       case e:ConsumerTimeoutException => // this is ok
-      case oe: Exception => logger.error("error in ZKConsumerThread", oe)
+      case oe: Exception => error("error in ZKConsumerThread", oe)
     }
     shutdownLatch.countDown
     println("Received " + count + " messages")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Wed Nov 23 07:21:16 2011
@@ -17,13 +17,12 @@
 
 package kafka.tools
 
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.producer._
 import async.DefaultEventHandler
 import kafka.serializer.StringEncoder
-import org.apache.log4j.Logger
 import joptsimple.{OptionSet, OptionParser}
 import java.util.{Random, Properties}
 import kafka.message.{CompressionCodec, Message, ByteBufferMessageSet}
@@ -31,14 +30,13 @@ import kafka.message.{CompressionCodec, 
 /**
  * Load test for the producer
  */
-object ProducerPerformance {
+object ProducerPerformance extends Logging {
 
   def main(args: Array[String]) {
 
-    val logger = Logger.getLogger(getClass)
     val config = new PerfConfig(args)
     if(!config.isFixSize)
-      logger.info("WARN: Throughput will be slower due to changing message size per request")
+      info("WARN: Throughput will be slower due to changing message size per request")
 
     val totalBytesSent = new AtomicLong(0)
     val totalMessagesSent = new AtomicLong(0)
@@ -56,9 +54,9 @@ object ProducerPerformance {
 
     allDone.await()
     val elapsedSecs = (System.currentTimeMillis - startMs) / 1000.0
-    logger.info("Total Num Messages: " + totalMessagesSent.get + " bytes: " + totalBytesSent.get + " in " + elapsedSecs + " secs")
-    logger.info("Messages/sec: " + (1.0 * totalMessagesSent.get / elapsedSecs).formatted("%.4f"))
-    logger.info("MB/sec: " + (totalBytesSent.get / elapsedSecs / (1024.0*1024.0)).formatted("%.4f"))
+    info("Total Num Messages: " + totalMessagesSent.get + " bytes: " + totalBytesSent.get + " in " + elapsedSecs + " secs")
+    info("Messages/sec: " + (1.0 * totalMessagesSent.get / elapsedSecs).formatted("%.4f"))
+    info("MB/sec: " + (totalBytesSent.get / elapsedSecs / (1024.0*1024.0)).formatted("%.4f"))
     System.exit(0)
   }
 
@@ -136,8 +134,7 @@ object ProducerPerformance {
                             val totalBytesSent: AtomicLong,
                             val totalMessagesSent: AtomicLong,
                             val allDone: CountDownLatch,
-                            val rand: Random) extends Runnable {
-    val logger = Logger.getLogger(getClass)
+                            val rand: Random) extends Runnable with Logging {
     val brokerInfoList = config.brokerInfo.split("=")
     val props = new Properties()
     if (brokerInfoList(0) == "zk.connect")
@@ -151,7 +148,7 @@ object ProducerPerformance {
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("buffer.size", (64*1024).toString)
     props.put("queue.enqueueTimeout.ms", "-1")
-    logger.info("Producer properties = " + props.toString)
+    info("Producer properties = " + props.toString)
 
     val producerConfig = new ProducerConfig(props)
     val producer = new Producer[String, String](producerConfig, new StringEncoder,
@@ -166,7 +163,7 @@ object ProducerPerformance {
       var reportTime = System.currentTimeMillis()
       var lastReportTime = reportTime
       val messagesPerThread = config.numMessages / config.numThreads
-      logger.info("Messages per thread = " + messagesPerThread)
+      info("Messages per thread = " + messagesPerThread)
       for(j <- 0 until messagesPerThread) {
         var strLength = config.messageSize
         if (!config.isFixSize) {
@@ -183,7 +180,7 @@ object ProducerPerformance {
         }
         if(nSends % config.reportingInterval == 0) {
           reportTime = System.currentTimeMillis()
-          logger.info("thread " + threadId + ": " + nSends + " messages sent "
+          info("thread " + threadId + ": " + nSends + " messages sent "
             + (1000.0 * (nSends - lastNSends) / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec "
             + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec")
           lastReportTime = reportTime
@@ -203,8 +200,7 @@ object ProducerPerformance {
                            val totalBytesSent: AtomicLong,
                            val totalMessagesSent: AtomicLong,
                            val allDone: CountDownLatch,
-                           val rand: Random) extends Runnable {
-    val logger = Logger.getLogger(getClass)
+                           val rand: Random) extends Runnable with Logging {
     val props = new Properties()
     val brokerInfoList = config.brokerInfo.split("=")
     if (brokerInfoList(0) == "zk.connect")
@@ -228,7 +224,7 @@ object ProducerPerformance {
       var reportTime = System.currentTimeMillis()
       var lastReportTime = reportTime
       val messagesPerThread = config.numMessages / config.numThreads / config.batchSize
-      logger.info("Messages per thread = " + messagesPerThread)
+      info("Messages per thread = " + messagesPerThread)
       var messageSet: List[String] = Nil
       for(k <- 0 until config.batchSize) {
         messageSet ::= message
@@ -251,7 +247,7 @@ object ProducerPerformance {
         }
         if(nSends % config.reportingInterval == 0) {
           reportTime = System.currentTimeMillis()
-          logger.info("thread " + threadId + ": " + nSends + " messages sent "
+          info("thread " + threadId + ": " + nSends + " messages sent "
             + (1000.0 * (nSends - lastNSends) * config.batchSize / (reportTime - lastReportTime)).formatted("%.4f") + " nMsg/sec "
             + (1000.0 * (bytesSent - lastBytesSent) / (reportTime - lastReportTime) / (1024 * 1024)).formatted("%.4f") + " MBs/sec")
           lastReportTime = reportTime

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Wed Nov 23 07:21:16 2011
@@ -26,15 +26,14 @@ import kafka.producer.async.DefaultEvent
 import kafka.serializer.{DefaultEncoder, StringEncoder}
 import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Utils}
+import kafka.utils.{ZKStringSerializer, Utils, Logging}
 import kafka.api.OffsetRequest
 import org.I0Itec.zkclient._
 import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
 
-object ReplayLogProducer {
+object ReplayLogProducer extends Logging {
 
   private val GROUPID: String = "replay-log-producer"
-  private val logger = Logger.getLogger(getClass)
 
   def main(args: Array[String]) {
     var isNoPrint = false;
@@ -147,7 +146,7 @@ object ReplayLogProducer {
   def tryCleanupZookeeper(zkUrl: String, groupId: String) {
     try {
       val dir = "/consumers/" + groupId
-      logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+      info("Cleaning up temporary zookeeper data under " + dir + ".")
       val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
       zk.deleteRecursive(dir)
       zk.close()
@@ -156,9 +155,8 @@ object ReplayLogProducer {
     }
   }
 
-  class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread {
+  class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
-    val logger = Logger.getLogger(getClass)
     val props = new Properties()
     val brokerInfoList = config.brokerInfo.split("=")
     if (brokerInfoList(0) == "zk.connect")
@@ -179,7 +177,7 @@ object ReplayLogProducer {
                                                   null, new DefaultPartitioner[Message])
 
     override def run() {
-      logger.info("Starting consumer thread..")
+      info("Starting consumer thread..")
       var messageCount: Int = 0
       try {
         val iter =
@@ -194,15 +192,15 @@ object ReplayLogProducer {
               Thread.sleep(config.delayedMSBtwSend)
             messageCount += 1
           }catch {
-            case ie: Exception => logger.error("Skipping this message", ie)
+            case ie: Exception => error("Skipping this message", ie)
           }
         }
       }catch {
-        case e: ConsumerTimeoutException => logger.error("consumer thread timing out", e)
+        case e: ConsumerTimeoutException => error("consumer thread timing out", e)
       }
-      logger.info("Sent " + messageCount + " messages")
+      info("Sent " + messageCount + " messages")
       shutdownLatch.countDown
-      logger.info("thread finished execution !" )
+      info("thread finished execution !" )
     }
 
     def shutdown() {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Wed Nov 23 07:21:16 2011
@@ -23,17 +23,14 @@ import kafka.api.FetchRequest
 import kafka.utils._
 import kafka.consumer._
 import kafka.server._
-import org.apache.log4j.Logger
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer
  */
-object SimpleConsumerShell {
+object SimpleConsumerShell extends Logging {
 
   def main(args: Array[String]): Unit = {
 
-    val logger = Logger.getLogger(getClass)
-
     val parser = new OptionParser
     val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
                            .withRequiredArg
@@ -73,7 +70,7 @@ object SimpleConsumerShell {
     
     for(arg <- List(urlOpt, topicOpt)) {
       if(!options.has(arg)) {
-        logger.error("Missing required argument \"" + arg + "\"")
+        error("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
       }
@@ -87,7 +84,7 @@ object SimpleConsumerShell {
     val printOffsets = if(options.has(printOffsetOpt)) true else false
     val printMessages = if(options.has(printMessageOpt)) true else false
 
-    logger.info("Starting consumer...")
+    info("Starting consumer...")
     val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
     val thread = Utils.newThread("kafka-consumer", new Runnable() {
       def run() {
@@ -96,15 +93,14 @@ object SimpleConsumerShell {
           val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
           val messageSets = consumer.multifetch(fetchRequest)
           for (messages <- messageSets) {
-            if(logger.isDebugEnabled)
-            	logger.debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
+            debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
             var consumed = 0
             for(messageAndOffset <- messages) {
               if(printMessages)
-                logger.info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+                info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
               offset = messageAndOffset.offset
               if(printOffsets)
-                logger.info("next offset = " + offset)
+                info("next offset = " + offset)
               consumed += 1
             }
           }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Wed Nov 23 07:21:16 2011
@@ -20,14 +20,12 @@ package kafka.utils
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.utils._
-import org.apache.log4j.Logger
 
 /**
  * A scheduler for running jobs in the background
  * TODO: ScheduledThreadPoolExecutor notriously swallows exceptions
  */
-class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) {
-  private val logger = Logger.getLogger(getClass())
+class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging {
   private val threadId = new AtomicLong(0)
   private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
     def newThread(runnable: Runnable): Thread = {
@@ -44,11 +42,11 @@ class KafkaScheduler(val numThreads: Int
 
   def shutdownNow() {
     executor.shutdownNow()
-    logger.info("force shutdown scheduler " + baseThreadName)
+    info("force shutdown scheduler " + baseThreadName)
   }
 
   def shutdown() {
     executor.shutdown()
-    logger.info("shutdown scheduler " + baseThreadName)
+    info("shutdown scheduler " + baseThreadName)
   }
 }

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala?rev=1205311&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala Wed Nov 23 07:21:16 2011
@@ -0,0 +1,77 @@
+package kafka.utils
+
+import org.apache.log4j.Logger
+
+trait Logging {
+  val loggerName = this.getClass.getName
+  lazy val logger = Logger.getLogger(loggerName)
+
+  def trace(msg: => String): Unit = {
+    if (logger.isTraceEnabled())
+      logger.trace(msg)	
+  }
+  def trace(e: => Throwable): Any = {
+    if (logger.isTraceEnabled())
+      logger.trace("",e)	
+  }
+  def trace(msg: => String, e: => Throwable) = {
+    if (logger.isTraceEnabled())
+      logger.trace(msg,e)
+  }
+
+  def debug(msg: => String): Unit = {
+    if (logger.isDebugEnabled())
+      logger.debug(msg)
+  }
+  def debug(e: => Throwable): Any = {
+    if (logger.isDebugEnabled())
+      logger.debug("",e)	
+  }
+  def debug(msg: => String, e: => Throwable) = {
+    if (logger.isDebugEnabled())
+      logger.debug(msg,e)
+  }
+
+  def info(msg: => String): Unit = {
+    if (logger.isInfoEnabled())
+      logger.info(msg)
+  }
+  def info(e: => Throwable): Any = {
+    if (logger.isInfoEnabled())
+      logger.info("",e)
+  }
+  def info(msg: => String,e: => Throwable) = {
+    if (logger.isInfoEnabled())
+      logger.info(msg,e)
+  }
+
+  def warn(msg: => String): Unit = {
+    logger.warn(msg)
+  }
+  def warn(e: => Throwable): Any = {
+    logger.warn("",e)
+  }
+  def warn(msg: => String, e: => Throwable) = {
+    logger.warn(msg,e)
+  }	
+
+  def error(msg: => String):Unit = {
+    logger.error(msg)
+  }		
+  def error(e: => Throwable): Any = {
+    logger.error("",e)
+  }
+  def error(msg: => String, e: => Throwable) = {
+    logger.error(msg,e)
+  }
+
+  def fatal(msg: => String): Unit = {
+    logger.fatal(msg)
+  }
+  def fatal(e: => Throwable): Any = {
+    logger.fatal("",e)
+  }	
+  def fatal(msg: => String, e: => Throwable) = {
+    logger.fatal(msg,e)
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala Wed Nov 23 07:21:16 2011
@@ -20,7 +20,6 @@ package kafka.utils
 
 import java.lang.management.ManagementFactory
 import javax.management.ObjectName
-import org.apache.log4j.Logger
 
 /**
  * If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j.
@@ -31,8 +30,7 @@ import org.apache.log4j.Logger
  *
  * This is a Scala port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068
  * */
-object Mx4jLoader {
-  private val logger = Logger.getLogger(getClass())
+object Mx4jLoader extends Logging {
 
   def maybeLoad(): Boolean = {
     if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false))
@@ -40,7 +38,7 @@ object Mx4jLoader {
     val address = System.getProperty("mx4jaddress", "0.0.0.0")
     val port = Utils.getInt(System.getProperties(), "mx4jport", 8082)
     try {
-      logger.debug("Will try to load MX4j now, if it's in the classpath");
+      debug("Will try to load MX4j now, if it's in the classpath");
 
       val mbs = ManagementFactory.getPlatformMBeanServer()
       val processorName = new ObjectName("Server:name=XSLTProcessor")
@@ -58,15 +56,15 @@ object Mx4jLoader {
       httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef])
       mbs.registerMBean(xsltProcessor, processorName)
       httpAdaptorClass.getMethod("start").invoke(httpAdaptor)
-      logger.info("mx4j successfuly loaded")
+      info("mx4j successfuly loaded")
       true
     }
     catch {
 	  case e: ClassNotFoundException => {
-        logger.info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
+        info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
       }
       case e => {
-        logger.warn("Could not start register mbean in JMX", e);
+        warn("Could not start register mbean in JMX", e);
       }
     }
     false

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala Wed Nov 23 07:21:16 2011
@@ -17,11 +17,9 @@
 
 package kafka.utils;
 
-import org.apache.log4j.Logger
 import scala.math._
 
-object Throttler {
-  val logger = Logger.getLogger(classOf[Throttler])
+object Throttler extends Logging {
   val DefaultCheckIntervalMs = 100L
 }
 
@@ -67,8 +65,7 @@ class Throttler(val desiredRatePerSec: D
           val ellapsedMs = ellapsedNs / Time.NsPerMs
           val sleepTime = round(observedSoFar / desiredRateMs - ellapsedMs)
           if(sleepTime > 0) {
-            if(Throttler.logger.isDebugEnabled)
-              Throttler.logger.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + 
+            Throttler.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + 
                                      ", sleeping for " + sleepTime + " ms to compensate.")
             time.sleep(sleepTime)
           }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Wed Nov 23 07:21:16 2011
@@ -33,8 +33,7 @@ import kafka.message.{NoCompressionCodec
 /**
  * Helper functions!
  */
-object Utils {
-  private val logger = Logger.getLogger(getClass())
+object Utils extends Logging {
   
   /**
    * Wrap the given function in a java.lang.Runnable
@@ -60,8 +59,8 @@ object Utils {
         catch {
           case t =>
             // log any error and the stack trace
-            logger.error(t, t)
-            logger.error(stackTrace(t), t)
+            error(t)
+            error(stackTrace(t), t)
         }
       }
     }
@@ -362,17 +361,29 @@ object Utils {
   
   /**
    * Register the given mbean with the platform mbean server,
-   * unregistering any mbean that was there before
+   * unregistering any mbean that was there before. Note,
+   * this method will not throw an exception if the registration
+   * fails (since there is nothing you can do and it isn't fatal),
+   * instead it just returns false indicating the registration failed.
    * @param mbean The object to register as an mbean
    * @param name The name to register this mbean with
+   * @returns true if the registration succeeded
    */
-  def registerMBean(mbean: Object, name: String) {
-    val mbs = ManagementFactory.getPlatformMBeanServer()
-    mbs synchronized {
-      val objName = new ObjectName(name)
-      if(mbs.isRegistered(objName))
-        mbs.unregisterMBean(objName)
-      mbs.registerMBean(mbean, objName)
+  def registerMBean(mbean: Object, name: String): Boolean = {
+    try {
+      val mbs = ManagementFactory.getPlatformMBeanServer()
+      mbs synchronized {
+        val objName = new ObjectName(name)
+        if(mbs.isRegistered(objName))
+          mbs.unregisterMBean(objName)
+        mbs.registerMBean(mbean, objName)
+        true
+      }
+    } catch {
+      case e: Exception => {
+        error("Failed to register Mbean " + name, e)
+        false
+      }
     }
   }
   
@@ -525,10 +536,10 @@ object Utils {
     {
      try{
       val tempSplit = csVals(i).split(":")
-      logger.info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim))
+      info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim))
       map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V]
       } catch {
-          case _ =>  logger.error(exceptionMsg + ": " + csVals(i))
+          case _ =>  error(exceptionMsg + ": " + csVals(i))
         }
     }
     map

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Wed Nov 23 07:21:16 2011
@@ -23,13 +23,11 @@ import kafka.cluster.{Broker, Cluster}
 import scala.collection._
 import java.util.Properties
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
-import org.apache.log4j.Logger
 
-object ZkUtils {
+object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
-  private val logger = Logger.getLogger(getClass())  
 
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
@@ -83,12 +81,12 @@ object ZkUtils {
           case e2 => throw e2
         }
         if (storedData == null || storedData != data) {
-          logger.info("conflict in " + path + " data: " + data + " stored data: " + storedData)
+          info("conflict in " + path + " data: " + data + " stored data: " + storedData)
           throw e
         }
         else {
           // otherwise, the creation succeeded, return normally
-          logger.info(path + " exists with value " + data + " during connection loss; this is ok")
+          info(path + " exists with value " + data + " during connection loss; this is ok")
         }
       }
       case e2 => throw e2
@@ -142,7 +140,7 @@ object ZkUtils {
     catch {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
-        logger.info(path + " deleted during connection loss; this is ok")
+        info(path + " deleted during connection loss; this is ok")
       case e2 => throw e2
     }
   }
@@ -154,7 +152,7 @@ object ZkUtils {
     catch {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
-        logger.info(path + " deleted during connection loss; this is ok")
+        info(path + " deleted during connection loss; this is ok")
       case e2 => throw e2
     }
   }

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala Wed Nov 23 07:21:16 2011
@@ -19,11 +19,10 @@ package kafka
 
 import message.Message
 import org.apache.log4j.{Logger, PropertyConfigurator}
+import kafka.utils.Logging
 import serializer.Encoder
 
-object TestKafkaAppender {
-
-  private val logger = Logger.getLogger(TestKafkaAppender.getClass)
+object TestKafkaAppender extends Logging {
   
   def main(args:Array[String]) {
     
@@ -41,7 +40,7 @@ object TestKafkaAppender {
     }
 
     for(i <- 1 to 10)
-      logger.info("test")    
+      info("test")    
   }
 }
 



Mime
View raw message