kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1231276 - in /incubator/kafka/trunk/core/src: main/scala/kafka/producer/KafkaLog4jAppender.scala main/scala/kafka/producer/ProducerConfig.scala test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
Date Fri, 13 Jan 2012 19:53:33 GMT
Author: nehanarkhede
Date: Fri Jan 13 19:53:32 2012
New Revision: 1231276

URL: http://svn.apache.org/viewvc?rev=1231276&view=rev
Log:
KAFKA 244 Improve log4j appender to use kafka.producer.Producer; patched by vtkstef; reviewed
by nehanarkhede

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1231276&r1=1231275&r2=1231276&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Fri
Jan 13 19:53:32 2012
@@ -18,66 +18,77 @@
 package kafka.producer
 
 import async.MissingConfigException
-import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
 import org.apache.log4j.{Logger, AppenderSkeleton}
+import org.apache.log4j.helpers.LogLog
 import kafka.utils.{Utils, Logging}
 import kafka.serializer.Encoder
 import java.util.{Properties, Date}
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var port:Int = 0
   var host:String = null
   var topic:String = null
-  var encoderClass:String = null
+  var serializerClass:String = null
+  var zkConnect:String = null
+  var brokerList:String = null
   
-  private var producer:SyncProducer = null
-  private var encoder: Encoder[AnyRef] = null
-  
-  def getPort:Int = port
-  def setPort(port: Int) = { this.port = port }
-
-  def getHost:String = host
-  def setHost(host: String) = { this.host = host }
+  private var producer: Producer[String, String] = null
 
   def getTopic:String = topic
-  def setTopic(topic: String) = { this.topic = topic }
+  def setTopic(topic: String) { this.topic = topic }
 
-  def getEncoder:String = encoderClass
-  def setEncoder(encoder: String) = { this.encoderClass = encoder }
+  def getZkConnect:String = zkConnect
+  def setZkConnect(zkConnect: String) { this.zkConnect = zkConnect }
   
-  override def activateOptions = {
+  def getBrokerList:String = brokerList
+  def setBrokerList(brokerList: String) { this.brokerList = brokerList }
+  
+  def getSerializerClass:String = serializerClass
+  def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass
}
+
+  override def activateOptions() {
+    val connectDiagnostic : mutable.ListBuffer[String] = mutable.ListBuffer();
     // check for config parameter validity
-    if(host == null)
-      throw new MissingConfigException("Broker Host must be specified by the Kafka log4j
appender")
-    if(port == 0)
-      throw new MissingConfigException("Broker Port must be specified by the Kafka log4j
appender") 
+    val props = new Properties()
+    if( zkConnect == null) connectDiagnostic += "zkConnect"
+    else props.put("zk.connect", zkConnect);
+    if( brokerList == null) connectDiagnostic += "brokerList"
+    else if( props.isEmpty) props.put("broker.list", brokerList)
+    if(props.isEmpty )
+      throw new MissingConfigException(
+        connectDiagnostic mkString ("One of these connection properties must be specified:
", ", ", ".")
+      )
     if(topic == null)
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
-    if(encoderClass == null) {
-      info("Using default encoder - kafka.producer.DefaultStringEncoder")
-      encoder = Utils.getObject("kafka.producer.DefaultStringEncoder")
-    }else // instantiate the encoder, if present
-      encoder = Utils.getObject(encoderClass)
-    val props = new Properties()
-    props.put("host", host)
-    props.put("port", port.toString)
-    producer = new SyncProducer(new SyncProducerConfig(props))
-    info("Kafka producer connected to " + host + "," + port)
-    info("Logging for topic: " + topic)
+    if(serializerClass == null) {
+      serializerClass = "kafka.serializer.StringEncoder"
+      LogLog.warn("Using default encoder - kafka.serializer.StringEncoder")
+    }
+    props.put("serializer.class", serializerClass)
+    val config : ProducerConfig = new ProducerConfig(props)
+    producer = new Producer[String, String](config)
+    LogLog.debug("Kafka producer connected to " + (if(config.zkConnect == null) config.brokerList
else config.zkConnect))
+    LogLog.debug("Logging for topic: " + topic)
   }
   
-  override def append(event: LoggingEvent) = {
-    debug("[" + new Date(event.getTimeStamp).toString + "]" + event.getRenderedMessage +
-            " for " + host + "," + port)
-    val message = encoder.toMessage(event)
-    producer.send(topic, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = message))
+  override def append(event: LoggingEvent)  {
+    val message : String = if( this.layout == null) {
+      event.getRenderedMessage
+    }
+    else this.layout.format(event)
+    LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
+    val messageData : ProducerData[String, String] =
+      new ProducerData[String, String](topic, message)
+    producer.send(messageData);
   }
 
-  override def close = {
+  override def close() {
     if(!this.closed) {
       this.closed = true
-      producer.close
+      producer.close()
     }
   }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1231276&r1=1231275&r2=1231276&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Fri Jan
13 19:53:32 2012
@@ -32,6 +32,10 @@ class ProducerConfig(val props: Properti
   if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null)
     throw new InvalidConfigException("partitioner.class cannot be used when broker.list is
set")
 
+  /** If both broker.list and zk.connect options are specified, throw an exception */
+  if(brokerList != null && zkConnect != null)
+    throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
+
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1231276&r1=1231275&r2=1231276&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
Fri Jan 13 19:53:32 2012
@@ -24,7 +24,8 @@ import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
-import kafka.utils.{Utils, Logging}
+import kafka.utils.{TestUtils, TestZKUtils,Utils, Logging}
+import kafka.zk.EmbeddedZookeeper
 import junit.framework.Assert._
 import kafka.api.FetchRequest
 import kafka.serializer.Encoder
@@ -35,30 +36,58 @@ import org.junit.{After, Before, Test}
 
 class KafkaLog4jAppenderTest extends JUnitSuite with Logging {
 
-  var logDir: File = null
+  var logDirZk: File = null
+  var logDirBl: File = null
   //  var topicLogDir: File = null
-  var server: KafkaServer = null
-  val brokerPort: Int = 9092
-  var simpleConsumer: SimpleConsumer = null
+  var serverBl: KafkaServer = null
+  var serverZk: KafkaServer = null
+
+  var simpleConsumerZk: SimpleConsumer = null
+  var simpleConsumerBl: SimpleConsumer = null
+
   val tLogger = Logger.getLogger(getClass())
 
+  private val brokerZk = 0
+  private val brokerBl = 1
+
+  private val ports = TestUtils.choosePorts(2)
+  private val (portZk, portBl) = (ports(0), ports(1))
+
+  private var zkServer:EmbeddedZookeeper = null
+
   @Before
   def setUp() {
-    val config: Properties = createBrokerConfig(1, brokerPort)
-    val logDirPath = config.getProperty("log.dir")
-    logDir = new File(logDirPath)
+    zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
+
+    val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
+    val logDirZkPath = propsZk.getProperty("log.dir")
+    logDirZk = new File(logDirZkPath)
+    serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
+
+    val propsBl: Properties = createBrokerConfig(brokerBl, portBl)
+    val logDirBlPath = propsBl.getProperty("log.dir")
+    logDirBl = new File(logDirBlPath)
+    serverBl = TestUtils.createServer(new KafkaConfig(propsBl))
 
-    server = TestUtils.createServer(new KafkaConfig(config))
     Thread.sleep(100)
-    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+
+    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
+    simpleConsumerBl = new SimpleConsumer("localhost", portBl, 1000000, 64*1024)
   }
 
   @After
   def tearDown() {
-    simpleConsumer.close
-    server.shutdown
-    Thread.sleep(100)
-    Utils.rm(logDir)
+    simpleConsumerZk.close
+    simpleConsumerBl.close
+
+    serverZk.shutdown
+    serverBl.shutdown
+    Utils.rm(logDirZk)
+    Utils.rm(logDirBl)
+
+    Thread.sleep(500)
+    zkServer.shutdown
+    Thread.sleep(500)
   }
 
   @Test
@@ -66,9 +95,10 @@ class KafkaLog4jAppenderTest extends JUn
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.encoder", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // port missing
@@ -82,9 +112,10 @@ class KafkaLog4jAppenderTest extends JUn
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
-    props.put("log4j.appender.KAFKA.Port", "9092")
+    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // host missing
@@ -98,9 +129,10 @@ class KafkaLog4jAppenderTest extends JUn
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
-    props.put("log4j.appender.KAFKA.Port", "9092")
-    props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // topic missing
@@ -114,22 +146,23 @@ class KafkaLog4jAppenderTest extends JUn
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.Port", "9092")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // serializer missing
     try {
       PropertyConfigurator.configure(props)
     }catch {
-      case e: MissingConfigException => fail("should default to kafka.producer.DefaultStringEncoder")
+      case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder")
     }
   }
 
   @Test
-  def testLog4jAppends() {
-    PropertyConfigurator.configure(getLog4jConfig)
+  def testBrokerListLog4jAppends() {
+    PropertyConfigurator.configure(getLog4jConfigWithBrokerList)
 
     for(i <- 1 to 5)
       info("test")
@@ -137,7 +170,7 @@ class KafkaLog4jAppenderTest extends JUn
     Thread.sleep(500)
 
     var offset = 0L
-    val messages = simpleConsumer.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
+    val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
 
     var count = 0
     for(message <- messages) {
@@ -148,15 +181,48 @@ class KafkaLog4jAppenderTest extends JUn
     assertEquals(5, count)
   }
 
+  @Test
+  def testZkConnectLog4jAppends() {
+    PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
 
-  private def getLog4jConfig: Properties = {
+    for(i <- 1 to 5)
+      info("test")
+
+    Thread.sleep(500)
+
+    var offset = 0L
+    val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
+
+    var count = 0
+    for(message <- messages) {
+      count = count + 1
+      offset += message.offset
+    }
+
+    assertEquals(5, count)
+  }
+
+  private def getLog4jConfigWithBrokerList: Properties = {
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.Port", "9092")
-    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+    props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
+    props
+  }
+
+  private def getLog4jConfigWithZkConnect: Properties = {
+    var props = new Properties()
+    props.put("log4j.rootLogger", "INFO")
+    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
+    props.put("log4j.appender.KAFKA.Topic", "test-topic")
+    props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
     props
   }
 



Mime
View raw message