kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1205311 [1/3] - in /incubator/kafka/trunk/core/src: main/scala/kafka/ main/scala/kafka/consumer/ main/scala/kafka/consumer/storage/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/message/ main/scala/kafka/log/ main/scala/kafka/message/...
Date Wed, 23 Nov 2011 07:21:23 GMT
Author: jkreps
Date: Wed Nov 23 07:21:16 2011
New Revision: 1205311

URL: http://svn.apache.org/viewvc?rev=1205311&view=rev
Log:
KAFKA-193 Add a logging helper trait and use it where logging is used. Patch from Joe Stein.


Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Logging.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServerStartable.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Throttler.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/Kafka.scala Wed Nov 23 07:21:16 2011
@@ -18,18 +18,17 @@
 package kafka
 
 import consumer.ConsumerConfig
-import org.apache.log4j.Logger
 import producer.ProducerConfig
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
-import utils.Utils
+import utils.{Utils, Logging}
 import org.apache.log4j.jmx.LoggerDynamicMBean
 
-object Kafka {
-  private val logger = Logger.getLogger(Kafka.getClass)
+object Kafka extends Logging {
 
   def main(args: Array[String]): Unit = {
     val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
-    Utils.swallow(logger.warn, Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName))
+    import org.apache.log4j.Logger
+    Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
 
     if (!List(1, 3).contains(args.length)) {
       println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName()))
@@ -61,7 +60,7 @@ object Kafka {
       kafkaServerStartble.awaitShutdown
     }
     catch {
-      case e => logger.fatal(e)
+      case e => fatal(e)
     }
     System.exit(0)
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Wed Nov 23 07:21:16 2011
@@ -21,13 +21,12 @@ import scala.collection.mutable._
 import scala.collection.JavaConversions._
 import org.I0Itec.zkclient._
 import joptsimple._
-import org.apache.log4j.Logger
 import java.util.Arrays.asList
 import java.util.Properties
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import kafka.utils.ZkUtils
 import kafka.utils.ZKStringSerializer
 
@@ -35,9 +34,7 @@ import kafka.utils.ZKStringSerializer
  * Consumer that dumps messages out to standard out.
  *
  */
-object ConsoleConsumer {
-  
-  private val logger = Logger.getLogger(getClass())
+object ConsoleConsumer extends Logging {
 
   def main(args: Array[String]) {
     val parser = new OptionParser
@@ -136,7 +133,7 @@ object ConsoleConsumer {
         } catch {
           case e =>
             if (skipMessageOnError)
-              logger.error("Error processing message, skipping this message: ", e)
+              error("Error processing message, skipping this message: ", e)
             else
               throw e
         }
@@ -149,7 +146,7 @@ object ConsoleConsumer {
         }
       }
     } catch {
-      case e => logger.error("Error processing message, stopping consumer: ", e)
+      case e => error("Error processing message, stopping consumer: ", e)
     }
       
     System.out.flush()
@@ -171,7 +168,7 @@ object ConsoleConsumer {
   def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
     for(arg <- required) {
       if(!options.has(arg)) {
-        logger.error("Missing required argument \"" + arg + "\"")
+        error("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
         System.exit(1)
       }
@@ -207,7 +204,7 @@ object ConsoleConsumer {
   def tryCleanupZookeeper(zkUrl: String, groupId: String) {
     try {
       val dir = "/consumers/" + groupId
-      logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+      info("Cleaning up temporary zookeeper data under " + dir + ".")
       val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
       zk.deleteRecursive(dir)
       zk.close()

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Wed Nov 23 07:21:16 2011
@@ -18,8 +18,7 @@
 package kafka.consumer
 
 import scala.collection._
-import kafka.utils.Utils
-import org.apache.log4j.Logger
+import kafka.utils.{Utils, Logging}
 import kafka.serializer.{DefaultDecoder, Decoder}
 
 /**
@@ -48,8 +47,7 @@ trait ConsumerConnector {
   def shutdown()
 }
 
-object Consumer {
-  private val logger = Logger.getLogger(getClass())  
+object Consumer extends Logging {
   private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats"
 
   /**
@@ -60,7 +58,7 @@ object Consumer {
    */
   def create(config: ConsumerConfig): ConsumerConnector = {
     val consumerConnect = new ZookeeperConsumerConnector(config)
-    Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect, consumerStatsMBeanName))
+    Utils.registerMBean(consumerConnect, consumerStatsMBeanName)
     consumerConnect
   }
 
@@ -72,7 +70,7 @@ object Consumer {
    */
   def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
     val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
-    Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName))
+    Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName)
     consumerConnect
   }
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Wed Nov 23 07:21:16 2011
@@ -17,8 +17,7 @@
 
 package kafka.consumer
 
-import kafka.utils.IteratorTemplate
-import org.apache.log4j.Logger
+import kafka.utils.{IteratorTemplate, Logging}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
 import kafka.cluster.Partition
 import kafka.message.{MessageAndOffset, MessageSet, Message}
@@ -33,9 +32,8 @@ class ConsumerIterator[T](private val to
                           private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
                           private val decoder: Decoder[T])
-        extends IteratorTemplate[T] {
+        extends IteratorTemplate[T] with Logging {
   
-  private val logger = Logger.getLogger(classOf[ConsumerIterator[T]])
   private var current: Iterator[MessageAndOffset] = null
   private var currentDataChunk: FetchedDataChunk = null
   private var currentTopicInfo: PartitionTopicInfo = null
@@ -46,8 +44,7 @@ class ConsumerIterator[T](private val to
     if(consumedOffset < 0)
       throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
     currentTopicInfo.resetConsumeOffset(consumedOffset)
-    if(logger.isTraceEnabled)
-      logger.trace("Setting consumed offset to %d".format(consumedOffset))
+    trace("Setting consumed offset to %d".format(consumedOffset))
     ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
     decodedMessage
   }
@@ -64,14 +61,13 @@ class ConsumerIterator[T](private val to
         }
       }
       if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
-        if(logger.isDebugEnabled)
-          logger.debug("Received the shutdown command")
+        debug("Received the shutdown command")
         channel.offer(currentDataChunk)
         return allDone
       } else {
         currentTopicInfo = currentDataChunk.topicInfo
         if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
-          logger.error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+          error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
                         .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
         }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Wed Nov 23 07:21:16 2011
@@ -18,8 +18,7 @@
 package kafka.consumer
 
 import java.util.concurrent.atomic.AtomicLong
-import org.apache.log4j.Logger
-import kafka.utils.{Pool, Utils, threadsafe}
+import kafka.utils.{Pool, Utils, threadsafe, Logging}
 
 trait ConsumerTopicStatMBean {
   def getMessagesPerTopic: Long
@@ -34,8 +33,7 @@ class ConsumerTopicStat extends Consumer
   def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
 }
 
-object ConsumerTopicStat {
-  private val logger = Logger.getLogger(getClass())
+object ConsumerTopicStat extends Logging {
   private val stats = new Pool[String, ConsumerTopicStat]
 
   def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
@@ -43,7 +41,7 @@ object ConsumerTopicStat {
     if (stat == null) {
       stat = new ConsumerTopicStat
       if (stats.putIfNotExists(topic, stat) == null)
-        Utils.swallow(logger.warn, Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic))
+        Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic)
       else
         stat = stats.get(topic)
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Wed Nov 23 07:21:16 2011
@@ -18,7 +18,6 @@
 package kafka.consumer
 
 import scala.collection._
-import org.apache.log4j.Logger
 import kafka.cluster._
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.BlockingQueue
@@ -27,7 +26,6 @@ import java.util.concurrent.BlockingQueu
  * The fetcher is a background thread that fetches data from a set of servers
  */
 private[consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) {
-  private val logger = Logger.getLogger(getClass())
   private val EMPTY_FETCHER_THREADS = new Array[FetcherRunnable](0)
   @volatile
   private var fetcherThreads : Array[FetcherRunnable] = EMPTY_FETCHER_THREADS

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Wed Nov 23 07:21:16 2011
@@ -18,7 +18,6 @@
 package kafka.consumer
 
 import java.util.concurrent.CountDownLatch
-import org.apache.log4j.Logger
 import java.nio.channels.{ClosedChannelException, ClosedByInterruptException}
 import kafka.common.{OffsetOutOfRangeException, ErrorMapping}
 import kafka.cluster.{Partition, Broker}
@@ -32,8 +31,7 @@ class FetcherRunnable(val name: String,
                       val config: ConsumerConfig,
                       val broker: Broker,
                       val partitionTopicInfos: List[PartitionTopicInfo])
-  extends Thread(name) {
-  private val logger = Logger.getLogger(getClass())
+  extends Thread(name) with Logging {
   private val shutdownLatch = new CountDownLatch(1)
   private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.socketTimeoutMs,
     config.socketBufferSize)
@@ -43,43 +41,42 @@ class FetcherRunnable(val name: String,
   def shutdown(): Unit = {
     stopped = true
     interrupt
-    logger.debug("awaiting shutdown on fetcher " + name)
+    debug("awaiting shutdown on fetcher " + name)
     shutdownLatch.await
-    logger.debug("shutdown of fetcher " + name + " thread complete")
+    debug("shutdown of fetcher " + name + " thread complete")
   }
 
   override def run() {
-    for (info <- partitionTopicInfos)
-      logger.info(name + " start fetching topic: " + info.topic + " part: " + info.partition.partId + " offset: "
-        + info.getFetchOffset + " from " + broker.host + ":" + broker.port)
+    for (infopti <- partitionTopicInfos)
+      info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: "
+        + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port)
 
     try {
       while (!stopped) {
         val fetches = partitionTopicInfos.map(info =>
           new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
 
-        if (logger.isTraceEnabled)
-          logger.trace("fetch request: " + fetches.toString)
+        trace("fetch request: " + fetches.toString)
 
         val response = simpleConsumer.multifetch(fetches : _*)
 
         var read = 0L
 
-        for((messages, info) <- response.zip(partitionTopicInfos)) {
+        for((messages, infopti) <- response.zip(partitionTopicInfos)) {
           try {
             var done = false
             if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
-              logger.info("offset for " + info + " out of range")
+              info("offset for " + infopti + " out of range")
               // see if we can fix this error
-              val resetOffset = resetConsumerOffsets(info.topic, info.partition)
+              val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partition)
               if(resetOffset >= 0) {
-                info.resetFetchOffset(resetOffset)
-                info.resetConsumeOffset(resetOffset)
+                infopti.resetFetchOffset(resetOffset)
+                infopti.resetConsumeOffset(resetOffset)
                 done = true
               }
             }
             if (!done)
-              read += info.enqueue(messages, info.getFetchOffset)
+              read += infopti.enqueue(messages, infopti.getFetchOffset)
           }
           catch {
             case e1: IOException =>
@@ -88,18 +85,17 @@ class FetcherRunnable(val name: String,
             case e2 =>
               if (!stopped) {
                 // this is likely a repeatable error, log it and trigger an exception in the consumer
-                logger.error("error in FetcherRunnable for " + info, e2)
-                info.enqueueError(e2, info.getFetchOffset)
+                error("error in FetcherRunnable for " + infopti, e2)
+                infopti.enqueueError(e2, infopti.getFetchOffset)
               }
               // re-throw the exception to stop the fetcher
               throw e2
           }
         }
 
-        if (logger.isTraceEnabled)
-          logger.trace("fetched bytes: " + read)
+        trace("fetched bytes: " + read)
         if(read == 0) {
-          logger.debug("backing off " + config.backoffIncrementMs + " ms")
+          debug("backing off " + config.backoffIncrementMs + " ms")
           Thread.sleep(config.backoffIncrementMs)
         }
       }
@@ -107,12 +103,12 @@ class FetcherRunnable(val name: String,
     catch {
       case e =>
         if (stopped)
-          logger.info("FecherRunnable " + this + " interrupted")
+          info("FecherRunnable " + this + " interrupted")
         else
-          logger.error("error in FetcherRunnable ", e)
+          error("error in FetcherRunnable ", e)
     }
 
-    logger.info("stopping fetcher " + name + " to host " + broker.host)
+    info("stopping fetcher " + name + " to host " + broker.host)
     Utils.swallow(logger.info, simpleConsumer.close)
     shutdownComplete()
   }
@@ -136,7 +132,7 @@ class FetcherRunnable(val name: String,
     val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
     // reset manually in zookeeper
-    logger.info("updating partition " + partition.name + " for topic " + topic + " with " +
+    info("updating partition " + partition.name + " for topic " + topic + " with " +
             (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
     ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString)
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Wed Nov 23 07:21:16 2011
@@ -18,7 +18,6 @@
 package kafka.consumer
 
 import java.util.concurrent.BlockingQueue
-import org.apache.log4j.Logger
 import kafka.message.Message
 import kafka.serializer.{DefaultDecoder, Decoder}
 
@@ -32,7 +31,6 @@ class KafkaMessageStream[T](val topic: S
                             private val decoder: Decoder[T])
    extends Iterable[T] with java.lang.Iterable[T]{
 
-  private val logger = Logger.getLogger(getClass())
   private val iter: ConsumerIterator[T] =
     new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder)
     

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Wed Nov 23 07:21:16 2011
@@ -22,8 +22,8 @@ import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.message._
 import kafka.cluster._
+import kafka.utils.Logging
 import kafka.common.ErrorMapping
-import org.apache.log4j.Logger
 
 private[consumer] class PartitionTopicInfo(val topic: String,
                                            val brokerId: Int,
@@ -31,12 +31,10 @@ private[consumer] class PartitionTopicIn
                                            private val chunkQueue: BlockingQueue[FetchedDataChunk],
                                            private val consumedOffset: AtomicLong,
                                            private val fetchedOffset: AtomicLong,
-                                           private val fetchSize: AtomicInteger) {
-  private val logger = Logger.getLogger(getClass())
-  if (logger.isDebugEnabled) {
-    logger.debug("initial consumer offset of " + this + " is " + consumedOffset.get)
-    logger.debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
-  }
+                                           private val fetchSize: AtomicInteger) extends Logging {
+
+  debug("initial consumer offset of " + this + " is " + consumedOffset.get)
+  debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
 
   def getConsumeOffset() = consumedOffset.get
 
@@ -44,14 +42,12 @@ private[consumer] class PartitionTopicIn
 
   def resetConsumeOffset(newConsumeOffset: Long) = {
     consumedOffset.set(newConsumeOffset)
-    if (logger.isDebugEnabled)
-      logger.debug("reset consume offset of " + this + " to " + newConsumeOffset)
+    debug("reset consume offset of " + this + " to " + newConsumeOffset)
   }
 
   def resetFetchOffset(newFetchOffset: Long) = {
     fetchedOffset.set(newFetchOffset)
-    if (logger.isDebugEnabled)
-      logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
+    debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
   }
 
   /**
@@ -62,12 +58,10 @@ private[consumer] class PartitionTopicIn
     val size = messages.validBytes
     if(size > 0) {
       // update fetched offset to the compressed data chunk size, not the decompressed message set size
-      if(logger.isTraceEnabled)
-        logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
+      trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
       val newOffset = fetchedOffset.addAndGet(size)
-      if (logger.isDebugEnabled)
-        logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
+      debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
     }
     size
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Wed Nov 23 07:21:16 2011
@@ -21,7 +21,6 @@ import java.net._
 import java.nio._
 import java.nio.channels._
 import java.util.concurrent.atomic._
-import org.apache.log4j.Logger
 import kafka.api._
 import kafka.common._
 import kafka.message._
@@ -35,8 +34,7 @@ import kafka.utils._
 class SimpleConsumer(val host: String,
                      val port: Int,
                      val soTimeout: Int,
-                     val bufferSize: Int) {
-  private val logger = Logger.getLogger(getClass())
+                     val bufferSize: Int) extends Logging {
   private var channel : SocketChannel = null
   private val lock = new Object()
 
@@ -45,23 +43,20 @@ class SimpleConsumer(val host: String,
     val address = new InetSocketAddress(host, port)
 
     val channel = SocketChannel.open
-    if(logger.isDebugEnabled)
-      logger.debug("Connected to " + address + " for fetching.")
+    debug("Connected to " + address + " for fetching.")
     channel.configureBlocking(true)
     channel.socket.setReceiveBufferSize(bufferSize)
     channel.socket.setSoTimeout(soTimeout)
     channel.socket.setKeepAlive(true)
     channel.connect(address)
-    if(logger.isTraceEnabled) {
-      logger.trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize)
-      logger.trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout)
-    }
+    trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize)
+    trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout)
+    
     channel
   }
 
   private def close(channel: SocketChannel) = {
-    if(logger.isDebugEnabled)
-      logger.debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
+    debug("Disconnecting from " + channel.socket.getRemoteSocketAddress())
     Utils.swallow(logger.warn, channel.close())
     Utils.swallow(logger.warn, channel.socket.close())
   }
@@ -90,7 +85,7 @@ class SimpleConsumer(val host: String,
         response = getResponse
       } catch {
         case e : java.io.IOException =>
-          logger.info("fetch reconnect due to " + e)
+          info("fetch reconnect due to " + e)
           // retry once
           try {
             channel = connect
@@ -124,7 +119,7 @@ class SimpleConsumer(val host: String,
         response = getResponse
       } catch {
         case e : java.io.IOException =>
-          logger.info("multifetch reconnect due to " + e)
+          info("multifetch reconnect due to " + e)
           // retry once
           try {
             channel = connect
@@ -160,7 +155,7 @@ class SimpleConsumer(val host: String,
         response = getResponse
       } catch {
         case e : java.io.IOException =>
-          logger.info("getOffsetsBefore reconnect due to " + e)
+          info("getOffsetsBefore reconnect due to " + e)
           // retry once
           try {
             channel = connect
@@ -222,11 +217,10 @@ class SimpleConsumerStats extends Simple
   def getConsumerThroughput: Double = fetchRequestStats.getThroughput
 }
 
-object SimpleConsumerStats {
-  private val logger = Logger.getLogger(getClass())
+object SimpleConsumerStats extends Logging {
   private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
   private val stats = new SimpleConsumerStats
-  Utils.swallow(logger.warn, Utils.registerMBean(stats, simpleConsumerstatsMBeanName))
+  Utils.registerMBean(stats, simpleConsumerstatsMBeanName)
 
   def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
   def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Wed Nov 23 07:21:16 2011
@@ -19,10 +19,9 @@ package kafka.consumer
 
 import scala.collection._
 import scala.util.parsing.json.JSON
-import org.apache.log4j.Logger
+import kafka.utils.Logging
 
-private[consumer] object TopicCount {
-  private val logger = Logger.getLogger(getClass())
+private[consumer] object TopicCount extends Logging {
   val myConversionFunc = {input : String => input.toInt}
   JSON.globalNumberParser = myConversionFunc
 
@@ -36,7 +35,7 @@ private[consumer] object TopicCount {
     }
     catch {
       case e =>
-        logger.error("error parsing consumer json string " + jsonString, e)
+        error("error parsing consumer json string " + jsonString, e)
         throw e
     }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Nov 23 07:21:16 2011
@@ -20,7 +20,6 @@ package kafka.consumer
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import scala.collection._
-import org.apache.log4j.Logger
 import kafka.cluster._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -85,9 +84,8 @@ trait ZookeeperConsumerConnectorMBean {
 
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
-  extends ConsumerConnector with ZookeeperConsumerConnectorMBean {
+  extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging {
 
-  private val logger = Logger.getLogger(getClass())
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[Fetcher] = None
@@ -99,7 +97,7 @@ private[kafka] class ZookeeperConsumerCo
   connectZk()
   createFetcher()
   if (config.autoCommit) {
-    logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
+    info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
     scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
   }
 
@@ -117,14 +115,14 @@ private[kafka] class ZookeeperConsumerCo
   }
 
   private def connectZk() {
-    logger.info("Connecting to zookeeper instance at " + config.zkConnect)
+    info("Connecting to zookeeper instance at " + config.zkConnect)
     zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
   }
 
   def shutdown() {
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
-      logger.info("ZKConsumerConnector shutting down")
+      info("ZKConsumerConnector shutting down")
       try {
         scheduler.shutdownNow()
         fetcher match {
@@ -141,16 +139,16 @@ private[kafka] class ZookeeperConsumerCo
       }
       catch {
         case e =>
-          logger.fatal("error during consumer connector shutdown", e)
+          fatal("error during consumer connector shutdown", e)
       }
-      logger.info("ZKConsumerConnector shut down completed")
+      info("ZKConsumerConnector shut down completed")
     }
   }
 
   def consume[T](topicCountMap: scala.collection.Map[String,Int],
                  decoder: Decoder[T])
       : Map[String,List[KafkaMessageStream[T]]] = {
-    logger.debug("entering consume ")
+    debug("entering consume ")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
 
@@ -190,7 +188,7 @@ private[kafka] class ZookeeperConsumerCo
         streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder)
       }
       ret += (topic -> streamList)
-      logger.debug("adding topic " + topic + " and stream to map..")
+      debug("adding topic " + topic + " and stream to map..")
 
       // register on broker partition path changes
       val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic
@@ -203,30 +201,29 @@ private[kafka] class ZookeeperConsumerCo
   }
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
-    logger.info("begin registering consumer " + consumerIdString + " in ZK")
+    info("begin registering consumer " + consumerIdString + " in ZK")
     ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
-    logger.info("end registering consumer " + consumerIdString + " in ZK")
+    info("end registering consumer " + consumerIdString + " in ZK")
   }
 
   private def sendShudownToAllQueues() = {
     for (queue <- queues.values) {
-      logger.debug("Clearing up queue")
+      debug("Clearing up queue")
       queue.clear()
       queue.put(ZookeeperConsumerConnector.shutdownCommand)
-      logger.debug("Cleared queue and sent shutdown command")
+      debug("Cleared queue and sent shutdown command")
     }
   }
 
   def autoCommit() {
-    if(logger.isTraceEnabled)
-      logger.trace("auto committing")
+    trace("auto committing")
     try {
       commitOffsets()
     }
     catch {
       case t: Throwable =>
       // log it and let it go
-        logger.error("exception during autoCommit: ", t)
+        error("exception during autoCommit: ", t)
     }
   }
 
@@ -244,10 +241,9 @@ private[kafka] class ZookeeperConsumerCo
         catch {
           case t: Throwable =>
           // log it and let it go
-            logger.warn("exception during commitOffsets",  t)
+            warn("exception during commitOffsets",  t)
         }
-        if(logger.isDebugEnabled)
-          logger.debug("Committed offset " + newOffset + " for topic " + info)
+        debug("Committed offset " + newOffset + " for topic " + info)
       }
     }
   }
@@ -297,7 +293,7 @@ private[kafka] class ZookeeperConsumerCo
     }
     catch {
       case e =>
-        logger.error("error in getConsumedOffset JMX ", e)
+        error("error in getConsumedOffset JMX ", e)
     }
     return -2
   }
@@ -318,7 +314,7 @@ private[kafka] class ZookeeperConsumerCo
     }
     catch {
       case e =>
-        logger.error("error in earliestOrLatestOffset() ", e)
+        error("error in earliestOrLatestOffset() ", e)
     }
     finally {
       if (simpleConsumer != null)
@@ -351,7 +347,7 @@ private[kafka] class ZookeeperConsumerCo
        *  connection for us. We need to release the ownership of the current consumer and re-register this
        *  consumer in the consumer registry and trigger a rebalance.
        */
-      logger.info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
+      info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
       loadBalancerListener.resetState
       registerConsumerInZK(dirs, consumerIdString, topicCount)
       // explicitly trigger load balancing for this consumer
@@ -380,8 +376,7 @@ private[kafka] class ZookeeperConsumerCo
         for(partition <- infos.keys) {
           val znode = topicDirs.consumerOwnerDir + "/" + partition
           ZkUtils.deletePath(zkClient, znode)
-          if(logger.isDebugEnabled)
-            logger.debug("Consumer " + consumerIdString + " releasing " + znode)
+          debug("Consumer " + consumerIdString + " releasing " + znode)
         }
       }
     }
@@ -430,7 +425,7 @@ private[kafka] class ZookeeperConsumerCo
     def syncedRebalance() {
       rebalanceLock synchronized {
         for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) {
-          logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+          info("begin rebalancing consumer " + consumerIdString + " try #" + i)
           var done = false
           try {
             done = rebalance()
@@ -440,9 +435,9 @@ private[kafka] class ZookeeperConsumerCo
               // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
               // For example, a ZK node can disappear between the time we get all children and the time we try to get
               // the value of a child. Just let this go since another rebalance will be triggered.
-              logger.info("exception during rebalance ", e)
+              info("exception during rebalance ", e)
           }
-          logger.info("end rebalancing consumer " + consumerIdString + " try #" + i)
+          info("end rebalancing consumer " + consumerIdString + " try #" + i)
           if (done)
             return
           // release all partitions, reset state and retry
@@ -462,14 +457,14 @@ private[kafka] class ZookeeperConsumerCo
       val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
       val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
       if (relevantTopicThreadIdsMap.size <= 0) {
-        logger.info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.")
+        info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.")
         return true
       }
 
-      logger.info("Committing all offsets")
+      info("Committing all offsets")
       commitOffsets
 
-      logger.info("Releasing partition ownership")
+      info("Releasing partition ownership")
       releasePartitionOwnership()
 
       val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]]
@@ -484,7 +479,7 @@ private[kafka] class ZookeeperConsumerCo
         val nPartsPerConsumer = curPartitions.size / curConsumers.size
         val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
 
-        logger.info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
+        info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
           " for topic " + topic + " with consumers: " + curConsumers)
 
         for (consumerThreadId <- consumerThreadIdSet) {
@@ -498,11 +493,11 @@ private[kafka] class ZookeeperConsumerCo
            *  The first few consumers pick up an extra partition, if any.
            */
           if (nParts <= 0)
-            logger.warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
+            warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
           else {
             for (i <- startPart until startPart + nParts) {
               val partition = curPartitions(i)
-              logger.info(consumerThreadId + " attempting to claim partition " + partition)
+              info(consumerThreadId + " attempting to claim partition " + partition)
               if (!processPartition(topicDirs, partition, topic, consumerThreadId))
                 return false
             }
@@ -522,7 +517,7 @@ private[kafka] class ZookeeperConsumerCo
       for (partitionInfos <- topicRegistry.values)
         for (partition <- partitionInfos.values)
           allPartitionInfos ::= partition
-      logger.info("Consumer " + consumerIdString + " selected partitions : " +
+      info("Consumer " + consumerIdString + " selected partitions : " +
         allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))
 
       fetcher match {
@@ -540,7 +535,7 @@ private[kafka] class ZookeeperConsumerCo
       catch {
         case e: ZkNodeExistsException =>
         // The node hasn't been deleted by the original owner. So wait a bit and retry.
-          logger.info("waiting for the partition ownership to be deleted: " + partition)
+          info("waiting for the partition ownership to be deleted: " + partition)
           return false
         case e2 => throw e2
       }
@@ -579,8 +574,7 @@ private[kafka] class ZookeeperConsumerCo
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchSize))
       partTopicInfoMap.put(partition, partTopicInfo)
-      if (logger.isDebugEnabled)
-        logger.debug(partTopicInfo + " selected new offset " + offset)
+      debug(partTopicInfo + " selected new offset " + offset)
     }
   }
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Wed Nov 23 07:21:16 2011
@@ -17,16 +17,13 @@
 
 package kafka.consumer
 
-import org.apache.log4j.Logger
 import scala.collection.JavaConversions._
-import kafka.utils.{Utils, ZkUtils, ZKStringSerializer}
+import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
-    val eventHandler: TopicEventHandler[String]) {
-
-  private val logger = Logger.getLogger(getClass)
+    val eventHandler: TopicEventHandler[String]) extends Logging {
 
   val lock = new Object()
 
@@ -60,12 +57,12 @@ class ZookeeperTopicEventWatcher(val con
           zkClient = null
         }
         else
-          logger.warn("Cannot shutdown already shutdown topic event watcher.")
+          warn("Cannot shutdown already shutdown topic event watcher.")
       }
       catch {
         case e =>
-          logger.fatal(e)
-          logger.fatal(Utils.stackTrace(e))
+          fatal(e)
+          fatal(Utils.stackTrace(e))
       }
     }
   }
@@ -78,15 +75,15 @@ class ZookeeperTopicEventWatcher(val con
         try {
           if (zkClient != null) {
             val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
-            logger.debug("all topics: %s".format(latestTopics))
+            debug("all topics: %s".format(latestTopics))
 
             eventHandler.handleTopicEvent(latestTopics)
           }
         }
         catch {
           case e =>
-            logger.fatal(e)
-            logger.fatal(Utils.stackTrace(e))
+            fatal(e)
+            fatal(Utils.stackTrace(e))
         }
       }
     }
@@ -103,7 +100,7 @@ class ZookeeperTopicEventWatcher(val con
     def handleNewSession() {
       lock.synchronized {
         if (zkClient != null) {
-          logger.info(
+          info(
             "ZK expired: resubscribing topic event listener to topic registry")
           zkClient.subscribeChildChanges(
             ZkUtils.BrokerTopicsPath, topicEventListener)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala Wed Nov 23 07:21:16 2011
@@ -18,7 +18,6 @@
 package kafka.consumer.storage.sql
 
 import java.sql._
-import org.apache.log4j._
 import kafka.utils._
 import kafka.consumer.storage.OffsetStorage
 
@@ -26,9 +25,8 @@ import kafka.consumer.storage.OffsetStor
  * An offset storage implementation that uses an oracle database to save offsets
  */
 @nonthreadsafe
-class OracleOffsetStorage(val connection: Connection) extends OffsetStorage {
+class OracleOffsetStorage(val connection: Connection) extends OffsetStorage with Logging {
   
-  private val logger: Logger = Logger.getLogger(classOf[OracleOffsetStorage])
   private val lock = new Object
   connection.setAutoCommit(false)
   
@@ -43,8 +41,7 @@ class OracleOffsetStorage(val connection
       }
     }
     
-    if(logger.isDebugEnabled)
-      logger.debug("Reserved node " + node + " for topic '" + topic + " offset " + offset)
+    debug("Reserved node " + node + " for topic '" + topic + " offset " + offset)
     
     offset
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala Wed Nov 23 07:21:16 2011
@@ -17,14 +17,12 @@
 package kafka.javaapi
 
 import java.nio.ByteBuffer
-import org.apache.log4j.Logger
 import kafka.serializer.Encoder
 import kafka.producer.{ProducerConfig, ProducerPool}
 import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+import kafka.utils.Logging
 
-private[javaapi] object Implicits {
-  private val logger = Logger.getLogger(getClass())
-
+private[javaapi] object Implicits extends Logging {
   implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet):
      kafka.message.ByteBufferMessageSet = messageSet.underlying
 
@@ -35,14 +33,12 @@ private[javaapi] object Implicits {
   }
 
   implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
-    if(logger.isDebugEnabled)
-      logger.debug("Implicit instantiation of Java Sync Producer")
+    debug("Implicit instantiation of Java Sync Producer")
     new kafka.javaapi.producer.SyncProducer(producer)
   }
 
   implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
-    if(logger.isDebugEnabled)
-      logger.debug("Implicit instantiation of Sync Producer")
+    debug("Implicit instantiation of Sync Producer")
     producer.underlying
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Wed Nov 23 07:21:16 2011
@@ -18,13 +18,11 @@ package kafka.javaapi.message
 
 import java.nio.ByteBuffer
 import kafka.common.ErrorMapping
-import org.apache.log4j.Logger
 import kafka.message._
 
 class ByteBufferMessageSet(private val buffer: ByteBuffer,
                            private val initialOffset: Long = 0L,
                            private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
-  private val logger = Logger.getLogger(getClass())
   val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
                                                                                               initialOffset,
                                                                                               errorCode)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Wed Nov 23 07:21:16 2011
@@ -20,7 +20,6 @@ package kafka.log
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
 import java.io._
-import org.apache.log4j._
 import kafka.message._
 import kafka.utils._
 import kafka.common._
@@ -101,9 +100,7 @@ private[log] class LogSegment(val file: 
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) {
-
-  private val logger = Logger.getLogger(classOf[Log])
+private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
@@ -160,7 +157,7 @@ private[log] class Log(val dir: File, va
       //make the final section mutable and run recovery on it if necessary
       val last = accum.remove(accum.size - 1)
       last.messageSet.close()
-      logger.info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
+      info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
       val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
       accum.add(mutable)
     }
@@ -285,8 +282,7 @@ private[log] class Log(val dir: File, va
       val last = segments.view.last
       val newOffset = nextAppendOffset
       val newFile = new File(dir, Log.nameFromOffset(newOffset))
-      if(logger.isDebugEnabled)
-        logger.debug("Rolling log '" + name + "' to " + newFile.getName())
+      debug("Rolling log '" + name + "' to " + newFile.getName())
       segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
     }
   }
@@ -307,8 +303,7 @@ private[log] class Log(val dir: File, va
     if (unflushed.get == 0) return
 
     lock synchronized {
-      if(logger.isDebugEnabled)
-        logger.debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
           System.currentTimeMillis)
       segments.view.last.messageSet.flush()
       unflushed.set(0)
@@ -337,9 +332,7 @@ private[log] class Log(val dir: File, va
         startIndex = 0
       case _ =>
           var isFound = false
-          if(logger.isDebugEnabled) {
-            logger.debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
-          }
+          debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
           startIndex = offsetTimeArray.length - 1
           while (startIndex >= 0 && !isFound) {
             if (offsetTimeArray(startIndex)._2 <= request.time)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Wed Nov 23 07:21:16 2011
@@ -18,7 +18,6 @@
 package kafka.log
 
 import java.io._
-import org.apache.log4j.Logger
 import kafka.utils._
 import scala.actors.Actor
 import scala.collection._
@@ -36,14 +35,13 @@ private[kafka] class LogManager(val conf
                                 private val time: Time,
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
-                                needRecovery: Boolean) {
+                                needRecovery: Boolean) extends Logging {
   
   val logDir: File = new File(config.logDir)
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
   private val flushInterval = config.flushInterval
   private val topicPartitionsMap = config.topicPartitionsMap
-  private val logger = Logger.getLogger(classOf[LogManager])
   private val logCreationLock = new Object
   private val random = new java.util.Random
   private var kafkaZookeeper: KafkaZooKeeper = null
@@ -57,7 +55,7 @@ private[kafka] class LogManager(val conf
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
   if(!logDir.exists()) {
-    logger.info("No log directory found, creating '" + logDir.getAbsolutePath() + "'")
+    info("No log directory found, creating '" + logDir.getAbsolutePath() + "'")
     logDir.mkdirs()
   }
   if(!logDir.isDirectory() || !logDir.canRead())
@@ -66,9 +64,9 @@ private[kafka] class LogManager(val conf
   if(subDirs != null) {
     for(dir <- subDirs) {
       if(!dir.isDirectory()) {
-        logger.warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
+        warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
-        logger.info("Loading log '" + dir.getName() + "'")
+        info("Loading log '" + dir.getName() + "'")
         val log = new Log(dir, maxSize, flushInterval, needRecovery)
         val topicPartion = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
@@ -80,7 +78,7 @@ private[kafka] class LogManager(val conf
   
   /* Schedule the cleanup task to delete old logs */
   if(scheduler != null) {
-    logger.info("starting log cleaner every " + logCleanupIntervalMs + " ms")    
+    info("starting log cleaner every " + logCleanupIntervalMs + " ms")    
     scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 
@@ -96,10 +94,10 @@ private[kafka] class LogManager(val conf
                 kafkaZookeeper.registerTopicInZk(topic)
               }
               catch {
-                case e => logger.error(e) // log it and let it go
+                case e => error(e) // log it and let it go
               }
             case StopActor =>
-              logger.info("zkActor stopped")
+              info("zkActor stopped")
               exit
           }
         }
@@ -127,7 +125,7 @@ private[kafka] class LogManager(val conf
         kafkaZookeeper.registerTopicInZk(topic)
       startupLatch.countDown
     }
-    logger.info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
+    info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
     logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
   }
 
@@ -160,7 +158,7 @@ private[kafka] class LogManager(val conf
     if (topic.length <= 0)
       throw new InvalidTopicException("topic name can't be empty")
     if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
-      logger.warn("Wrong partition " + partition + " valid partitions (0," +
+      warn("Wrong partition " + partition + " valid partitions (0," +
               (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
       throw new InvalidPartitionException("wrong partition " + partition)
     }
@@ -211,7 +209,7 @@ private[kafka] class LogManager(val conf
         log = found
       }
       else
-        logger.info("Created log for '" + topic + "'-" + partition)
+        info("Created log for '" + topic + "'-" + partition)
     }
 
     if (hasNewTopic)
@@ -223,10 +221,10 @@ private[kafka] class LogManager(val conf
   private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
     var total = 0
     for(segment <- segments) {
-      logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
+      info("Deleting log segment " + segment.file.getName() + " from " + log.name)
       Utils.swallow(logger.warn, segment.messageSet.close())
       if(!segment.file.delete()) {
-        logger.warn("Delete failed.")
+        warn("Delete failed.")
       } else {
         total += 1
       }
@@ -268,16 +266,16 @@ private[kafka] class LogManager(val conf
    * Delete any eligible logs. Return the number of segments deleted.
    */
   def cleanupLogs() {
-    logger.debug("Beginning log cleanup...")
+    debug("Beginning log cleanup...")
     val iter = getLogIterator
     var total = 0
     val startMs = time.milliseconds
     while(iter.hasNext) {
       val log = iter.next
-      logger.debug("Garbage collecting '" + log.name + "'")
+      debug("Garbage collecting '" + log.name + "'")
       total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
     }
-    logger.debug("Log cleanup completed. " + total + " files deleted in " + 
+    debug("Log cleanup completed. " + total + " files deleted in " + 
                  (time.milliseconds - startMs) / 1000 + " seconds")
   }
   
@@ -316,8 +314,7 @@ private[kafka] class LogManager(val conf
   }
 
   private def flushAllLogs() = {
-    if (logger.isDebugEnabled)
-      logger.debug("flushing the high watermark of all logs")
+    debug("flushing the high watermark of all logs")
 
     for (log <- getLogIterator)
     {
@@ -326,18 +323,17 @@ private[kafka] class LogManager(val conf
         var logFlushInterval = config.defaultFlushIntervalMs
         if(logFlushIntervalMap.contains(log.getTopicName))
           logFlushInterval = logFlushIntervalMap(log.getTopicName)
-        if (logger.isDebugEnabled)
-          logger.debug(log.getTopicName + " flush interval  " + logFlushInterval +
+        debug(log.getTopicName + " flush interval  " + logFlushInterval +
             " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       }
       catch {
         case e =>
-          logger.error("Error flushing topic " + log.getTopicName, e)
+          error("Error flushing topic " + log.getTopicName, e)
           e match {
             case _: IOException =>
-              logger.fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
+              fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
               Runtime.getRuntime.halt(1)
             case _ =>
           }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed Nov 23 07:21:16 2011
@@ -18,7 +18,7 @@
 package kafka.message
 
 import scala.collection.mutable
-import org.apache.log4j.Logger
+import kafka.utils.Logging
 import kafka.common.{InvalidMessageSizeException, ErrorMapping}
 import java.nio.ByteBuffer
 import java.nio.channels.WritableByteChannel
@@ -36,8 +36,7 @@ import kafka.utils.IteratorTemplate
  */
 class ByteBufferMessageSet(private val buffer: ByteBuffer,
                            private val initialOffset: Long = 0L,
-                           private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
-  private val logger = Logger.getLogger(getClass())  
+                           private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
   private var validByteCount = -1L
   private var shallowValidByteCount = -1L
 
@@ -94,10 +93,9 @@ class ByteBufferMessageSet(private val b
         val size = topIter.getInt()
         lastMessageSize = size
 
-        if(logger.isTraceEnabled) {
-          logger.trace("Remaining bytes in iterator = " + topIter.remaining)
-          logger.trace("size of data = " + size)
-        }
+        trace("Remaining bytes in iterator = " + topIter.remaining)
+        trace("size of data = " + size)
+
         if(size < 0 || topIter.remaining < size) {
           if (currValidBytes == initialOffset || size < 0)
             throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
@@ -111,16 +109,13 @@ class ByteBufferMessageSet(private val b
         val newMessage = new Message(message)
         newMessage.compressionCodec match {
           case NoCompressionCodec =>
-            if(logger.isDebugEnabled)
-              logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
+            debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
             innerIter = null
             currValidBytes += 4 + size
-            if(logger.isTraceEnabled)
-              logger.trace("currValidBytes = " + currValidBytes)
+            trace("currValidBytes = " + currValidBytes)
             new MessageAndOffset(newMessage, currValidBytes)
           case _ =>
-            if(logger.isDebugEnabled)
-              logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+            debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
             innerIter = CompressionUtils.decompress(newMessage).deepIterator
             if (!innerIter.hasNext) {
               currValidBytes += 4 + lastMessageSize
@@ -132,8 +127,7 @@ class ByteBufferMessageSet(private val b
 
       override def makeNext(): MessageAndOffset = {
         val isInnerDone = innerDone()
-        if(logger.isDebugEnabled)
-          logger.debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
+        debug("makeNext() in deepIterator: innerDone = " + isInnerDone)
         isInnerDone match {
           case true => makeNextOuter
           case false => {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Wed Nov 23 07:21:16 2011
@@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream
 import java.io.IOException
 import java.io.InputStream
 import java.nio.ByteBuffer
-import org.apache.log4j.Logger
+import kafka.utils._
 
 abstract sealed class CompressionFacade(inputStream: InputStream, outputStream: ByteArrayOutputStream) {
   def close() = {
@@ -91,8 +91,7 @@ object CompressionFactory {
   }
 }
 
-object CompressionUtils {
-  private val logger = Logger.getLogger(getClass)
+object CompressionUtils extends Logging{
 
   //specify the codec which is the default when DefaultCompressionCodec is used
   private var defaultCodec: CompressionCodec = GZIPCompressionCodec
@@ -100,8 +99,7 @@ object CompressionUtils {
   def compress(messages: Iterable[Message], compressionCodec: CompressionCodec = DefaultCompressionCodec):Message = {
 	val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
 	
-	if(logger.isDebugEnabled)
-	  logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+	debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
 
     var cf: CompressionFacade = null
 		
@@ -117,7 +115,7 @@ object CompressionUtils {
     try {
       cf.write(messageByteBuffer.array)
     } catch {
-      case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+      case e: IOException => error("Error while writing to the GZIP output stream", e)
       cf.close()
       throw e
     } finally {
@@ -146,7 +144,7 @@ object CompressionUtils {
         outputStream.write(intermediateBuffer, 0, dataRead)
       }
     }catch {
-      case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+      case e: IOException => error("Error while reading from the GZIP input stream", e)
       cf.close()
       throw e
     } finally {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala Wed Nov 23 07:21:16 2011
@@ -21,7 +21,6 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.util.concurrent.atomic._
-import org.apache.log4j.Logger
 
 import kafka._
 import kafka.message._
@@ -38,11 +37,10 @@ class FileMessageSet private[kafka](priv
                                     private[message] val offset: Long,
                                     private[message] val limit: Long,
                                     val mutable: Boolean,
-                                    val needRecover: AtomicBoolean) extends MessageSet {
+                                    val needRecover: AtomicBoolean) extends MessageSet with Logging {
   
   private val setSize = new AtomicLong()
   private val setHighWaterMark = new AtomicLong()
-  private val logger = Logger.getLogger(classOf[FileMessageSet])
   
   if(mutable) {
     if(limit < Long.MaxValue || offset > 0)
@@ -52,7 +50,7 @@ class FileMessageSet private[kafka](priv
       // set the file position to the end of the file for appending messages
       val startMs = System.currentTimeMillis
       val truncated = recover()
-      logger.info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 +
+      info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 +
                 " seconds. " + truncated + " bytes truncated.")
     }
     else {
@@ -63,8 +61,7 @@ class FileMessageSet private[kafka](priv
   } else {
     setSize.set(scala.math.min(channel.size(), limit) - offset)
     setHighWaterMark.set(sizeInBytes)
-    if(logger.isDebugEnabled)
-      logger.debug("initializing high water mark in immutable mode: " + highWaterMark)
+    debug("initializing high water mark in immutable mode: " + highWaterMark)
   }
   
   /**
@@ -174,11 +171,9 @@ class FileMessageSet private[kafka](priv
     channel.force(true)
     val elapsedTime = SystemTime.milliseconds - startTime
     LogFlushStats.recordFlushRequest(elapsedTime)
-    if (logger.isDebugEnabled)
-      logger.debug("flush time " + elapsedTime)
+    debug("flush time " + elapsedTime)
     setHighWaterMark.set(sizeInBytes)
-    if(logger.isDebugEnabled)
-      logger.debug("flush high water mark:" + highWaterMark)
+    debug("flush high water mark:" + highWaterMark)
   }
   
   /**
@@ -207,8 +202,7 @@ class FileMessageSet private[kafka](priv
     channel.truncate(validUpTo)
     setSize.set(validUpTo)
     setHighWaterMark.set(validUpTo)
-    if(logger.isDebugEnabled)
-      logger.info("recover high water mark:" + highWaterMark)
+    info("recover high water mark:" + highWaterMark)
     /* This should not be necessary, but fixes bug 6191269 on some OSs. */
     channel.position(validUpTo)
     needRecover.set(false)    
@@ -279,11 +273,10 @@ class LogFlushStats extends LogFlushStat
   def getNumFlushes: Long = flushRequestStats.getNumRequests
 }
 
-object LogFlushStats {
-  private val logger = Logger.getLogger(getClass())
+object LogFlushStats extends Logging {
   private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
   private val stats = new LogFlushStats
-  Utils.swallow(logger.warn, Utils.registerMBean(stats, LogFlushStatsMBeanName))
+  Utils.registerMBean(stats, LogFlushStatsMBeanName)
 
   def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Wed Nov 23 07:21:16 2011
@@ -40,7 +40,6 @@ class SocketServer(val port: Int,
                    private val handlerFactory: Handler.HandlerMapping,
                    val maxRequestSize: Int = Int.MaxValue) {
 
-  private val logger = Logger.getLogger(classOf[SocketServer])
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
   private var acceptor: Acceptor = new Acceptor(port, processors)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala Wed Nov 23 07:21:16 2011
@@ -19,14 +19,12 @@ package kafka.network
 
 import java.nio._
 import java.nio.channels._
-import org.apache.log4j.Logger
+import kafka.utils.Logging
 
 /**
  * Represents a stateful transfer of data to or from the network
  */
-private[network] trait Transmission {
-  
-  protected val logger: Logger = Logger.getLogger(getClass())
+private[network] trait Transmission extends Logging {
   
   def complete: Boolean
   
@@ -55,8 +53,7 @@ private[kafka] trait Receive extends Tra
     var read = 0
     while(!complete) {
       read = readFrom(channel)
-      if(logger.isTraceEnabled)
-        logger.trace(read + " bytes read.")
+      trace(read + " bytes read.")
     }
     read
   }
@@ -74,8 +71,7 @@ private[kafka] trait Send extends Transm
     var written = 0
     while(!complete) {
       written = writeTo(channel)
-      if(logger.isTraceEnabled)
-        logger.trace(written + " bytes written.")
+      trace(written + " bytes written.")
     }
     written
   }
@@ -102,7 +98,7 @@ abstract class MultiSend[S <: Send](val 
   def complete: Boolean = {
     if (current == Nil) {
       if (totalWritten != expectedBytesToWrite)
-        logger.error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
+        error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
       return true
     }
     else

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala Wed Nov 23 07:21:16 2011
@@ -18,13 +18,11 @@ package kafka.producer
 
 import collection.mutable.HashMap
 import collection.mutable.Map
-import org.apache.log4j.Logger
 import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
 import kafka.common.InvalidConfigException
 
 private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extends BrokerPartitionInfo {
-  private val logger = Logger.getLogger(classOf[ConfigBrokerPartitionInfo])
   private val brokerPartitions: SortedSet[Partition] = getConfigTopicPartitionInfo
   private val allBrokers = getConfigBrokerInfo
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala Wed Nov 23 07:21:16 2011
@@ -20,7 +20,6 @@ package kafka.producer
 import scala.collection.JavaConversions._
 import org.I0Itec.zkclient._
 import joptsimple._
-import org.apache.log4j.Logger
 import java.util.Arrays.asList
 import java.util.Properties
 import java.util.Random
@@ -31,8 +30,6 @@ import kafka.serializer._
 
 object ConsoleProducer { 
 
-  private val logger = Logger.getLogger(getClass())
-
   def main(args: Array[String]) { 
     val parser = new OptionParser
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Wed Nov 23 07:21:16 2011
@@ -20,19 +20,18 @@ package kafka.producer
 import async.MissingConfigException
 import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.{Logger, AppenderSkeleton}
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import kafka.serializer.Encoder
 import java.util.{Properties, Date}
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 
-class KafkaLog4jAppender extends AppenderSkeleton {
+class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var port:Int = 0
   var host:String = null
   var topic:String = null
   var encoderClass:String = null
   
   private var producer:SyncProducer = null
-  private val logger = Logger.getLogger(classOf[KafkaLog4jAppender])
   private var encoder: Encoder[AnyRef] = null
   
   def getPort:Int = port
@@ -56,7 +55,7 @@ class KafkaLog4jAppender extends Appende
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
     if(encoderClass == null) {
-      logger.info("Using default encoder - kafka.producer.DefaultStringEncoder")
+      info("Using default encoder - kafka.producer.DefaultStringEncoder")
       encoder = Utils.getObject("kafka.producer.DefaultStringEncoder")
     }else // instantiate the encoder, if present
       encoder = Utils.getObject(encoderClass)
@@ -64,15 +63,13 @@ class KafkaLog4jAppender extends Appende
     props.put("host", host)
     props.put("port", port.toString)
     producer = new SyncProducer(new SyncProducerConfig(props))
-    logger.info("Kafka producer connected to " + host + "," + port)
-    logger.info("Logging for topic: " + topic)
+    info("Kafka producer connected to " + host + "," + port)
+    info("Logging for topic: " + topic)
   }
   
   override def append(event: LoggingEvent) = {
-    if (logger.isDebugEnabled){
-      logger.debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage +
+    debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage +
             " for " + host + "," + port)
-    }
     val message = encoder.toMessage(event)
     producer.send(topic, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message))
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1205311&r1=1205310&r2=1205311&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Wed Nov 23 07:21:16 2011
@@ -17,7 +17,6 @@
 package kafka.producer
 
 import async.{CallbackHandler, EventHandler}
-import org.apache.log4j.Logger
 import kafka.serializer.Encoder
 import kafka.utils._
 import java.util.Properties
@@ -32,13 +31,12 @@ class Producer[K,V](config: ProducerConf
                     populateProducerPool: Boolean,
                     private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */
                                                           /* use the other constructor*/
-{
-  private val logger = Logger.getLogger(classOf[Producer[K, V]])
+extends Logging {
   private val hasShutdown = new AtomicBoolean(false)
   if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
     throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
   if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
-    logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
+    warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
   private val random = new java.util.Random
   // check if zookeeper based auto partition discovery is enabled
   private val zkEnabled = Utils.propertyExists(config.zkConnect)
@@ -115,7 +113,7 @@ class Producer[K,V](config: ProducerConf
       var numRetries: Int = 0
       while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) {
         if(numRetries > 0) {
-          logger.info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again")
+          info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again")
           brokerPartitionInfo.updateInfo
         }
 
@@ -130,7 +128,7 @@ class Producer[K,V](config: ProducerConf
 
       brokerInfoOpt match {
         case Some(brokerInfo) =>
-          if(logger.isDebugEnabled) logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+          debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
                   " on partition " + brokerIdPartition.get.partId)
         case None =>
           throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " +
@@ -153,12 +151,10 @@ class Producer[K,V](config: ProducerConf
       val brokerIdPartition = topicPartitionsList(randomBrokerId)
       val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
 
-      if(logger.isDebugEnabled)
-        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+      debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
                 " on a randomly chosen partition")
       val partition = ProducerRequest.RandomPartition
-      if(logger.isDebugEnabled)
-        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
+      debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
           brokerIdPartition.partId)
       producerPool.getProducerPoolData(pd.getTopic,
         new Partition(brokerIdPartition.brokerId, partition),
@@ -168,11 +164,9 @@ class Producer[K,V](config: ProducerConf
   }
 
   private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
-    if(logger.isDebugEnabled)
-      logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
+    debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
     val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
-    if(logger.isDebugEnabled)
-      logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
+    debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
     val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
     topicPartitionsList
@@ -206,7 +200,7 @@ class Producer[K,V](config: ProducerConf
    */
   private def producerCbk(bid: Int, host: String, port: Int) =  {
     if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
-    else logger.debug("Skipping the callback since populateProducerPool = false")
+    else debug("Skipping the callback since populateProducerPool = false")
   }
 
   /**



Mime
View raw message