kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2132; Move Log4J appender to a separate module; patched by Ashish Singh; reviewed by Gwen Shapira, Aditya Auradkar and Jun Rao
Date Mon, 06 Jul 2015 23:36:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fd612a2d5 -> 2d96da05a


kafka-2132; Move Log4J appender to a separate module; patched by Ashish Singh; reviewed by
Gwen Shapira, Aditya Auradkar and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d96da05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d96da05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d96da05

Branch: refs/heads/trunk
Commit: 2d96da05a0af7847aca5edc6d003a18be7f5216a
Parents: fd612a2
Author: Ashish Singh <asingh@cloudera.com>
Authored: Mon Jul 6 16:36:20 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Jul 6 16:36:20 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |  60 +++++--
 checkstyle/import-control.xml                   |   9 +-
 .../kafka/producer/KafkaLog4jAppender.scala     |  97 -----------
 .../kafka/log4j/KafkaLog4jAppenderTest.scala    | 143 ----------------
 .../kafka/log4jappender/KafkaLog4jAppender.java | 167 +++++++++++++++++++
 .../log4jappender/KafkaLog4jAppenderTest.java   |  98 +++++++++++
 .../log4jappender/MockKafkaLog4jAppender.java   |  47 ++++++
 settings.gradle                                 |   2 +-
 8 files changed, 370 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 727d7c5..ab86987 100644
--- a/build.gradle
+++ b/build.gradle
@@ -132,7 +132,7 @@ subprojects {
     archives srcJar
     archives javadocJar
   }
-  
+
   plugins.withType(ScalaPlugin) {
     //source jar should also contain scala source:
     srcJar.from sourceSets.main.scala
@@ -202,20 +202,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) {
   }
 }
 
-tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5',
'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar'])
{
+tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5',
'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar',
'log4j-appender:jar']) {
 }
 
-tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5',
'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar'])
{ }
+tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5',
'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar',
'log4j-appender:srcJar']) { }
 
-tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5',
'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar',
'contrib:hadoop-producer:docsJar']) { }
+tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5',
'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar',
'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { }
 
-tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5',
'test_core_2_11_6', 'clients:test']) {
+tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5',
'test_core_2_11_6', 'clients:test', 'log4j-appender:test']) {
 }
 
 tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2',
'releaseTarGz_2_10_5', 'releaseTarGz_2_11_6']) {
 }
 
-tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2',
'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives',
'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) {
+tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2',
'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives',
'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives'])
{
 }
 
 project(':core') {
@@ -228,6 +228,7 @@ project(':core') {
 
   dependencies {
     compile project(':clients')
+    compile project(':log4j-appender')
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile 'org.apache.zookeeper:zookeeper:3.4.6'
     compile 'com.101tec:zkclient:0.5'
@@ -237,7 +238,6 @@ project(':core') {
     testCompile 'junit:junit:4.6'
     testCompile 'org.easymock:easymock:3.0'
     testCompile 'org.objenesis:objenesis:1.2'
-    testCompile project(':clients')
     if (scalaVersion.startsWith('2.10')) {
       testCompile 'org.scalatest:scalatest_2.10:1.9.1'
     } else if (scalaVersion.startsWith('2.11')) {
@@ -273,9 +273,9 @@ project(':core') {
     into "$buildDir/dependant-libs-${scalaVersion}"
   }
 
-  tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type:
Tar) { 
+  tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type:
Tar) {
     into "kafka_${baseScalaVersion}-${version}"
-    compression = Compression.GZIP 
+    compression = Compression.GZIP
     from(project.file("../bin")) { into "bin/" }
     from(project.file("../config")) { into "config/" }
     from '../LICENSE'
@@ -378,7 +378,7 @@ project(':clients') {
     compile 'org.xerial.snappy:snappy-java:1.1.1.7'
     compile 'net.jpountz.lz4:lz4:1.2.0'
 
-    testCompile 'com.novocode:junit-interface:0.9'
+    testCompile 'junit:junit:4.6'
     testRuntime "$slf4jlog4j"
   }
 
@@ -405,7 +405,45 @@ project(':clients') {
   artifacts {
     archives testJar
   }
-  
+
+  configurations {
+    archives.extendsFrom (testCompile)
+  }
+
+  checkstyle {
+     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
+}
+
+project(':log4j-appender') {
+  apply plugin: 'checkstyle'
+  archivesBaseName = "kafka-log4j-appender"
+
+  dependencies {
+    compile project(':clients')
+    compile "$slf4jlog4j"
+
+    testCompile 'junit:junit:4.6'
+    testCompile project(path: ':clients', configuration: 'archives')
+  }
+
+  task testJar(type: Jar) {
+    classifier = 'test'
+    from sourceSets.test.output
+  }
+
+  test {
+    testLogging {
+        events "passed", "skipped", "failed"
+        exceptionFormat = 'full'
+    }
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/log4jappender/*"
+  }
+
   checkstyle {
      configFile = new File(rootDir, "checkstyle/checkstyle.xml")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f2e6cec..19e0659 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -95,8 +95,15 @@
 		</subpackage>
 	</subpackage>
 
+	<subpackage name="log4jappender">
+		<allow pkg="org.apache.log4j" />
+		<allow pkg="org.apache.kafka.clients" />
+		<allow pkg="org.apache.kafka.common" />
+		<allow pkg="org.apache.kafka.test" />
+	</subpackage>
+
 	<subpackage name="test">
 		<allow pkg="org.apache.kafka" />
 	</subpackage>
-	
+
 </import-control>

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
deleted file mode 100644
index 5d36a01..0000000
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.producer
-
-import async.MissingConfigException
-import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.AppenderSkeleton
-import org.apache.log4j.helpers.LogLog
-import kafka.utils.Logging
-import java.util.{Properties, Date}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-
-class KafkaLog4jAppender extends AppenderSkeleton with Logging {
-  var topic: String = null
-  var brokerList: String = null
-  var compressionType: String = null
-  var retries: Int = 0
-  var requiredNumAcks: Int = Int.MaxValue
-  var syncSend: Boolean = false
-
-  private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
-
-  def getTopic: String = topic
-  def setTopic(topic: String) { this.topic = topic }
-
-  def getBrokerList: String = brokerList
-  def setBrokerList(brokerList: String) { this.brokerList = brokerList }
-
-  def getCompressionType: String = compressionType
-  def setCompressionType(compressionType: String) { this.compressionType = compressionType
}
-
-  def getRequiredNumAcks: Int = requiredNumAcks
-  def setRequiredNumAcks(requiredNumAcks: Int) { this.requiredNumAcks = requiredNumAcks }
-
-  def getSyncSend: Boolean = syncSend
-  def setSyncSend(syncSend: Boolean) { this.syncSend = syncSend }
-
-  def getRetries: Int = retries
-  def setRetries(retries: Int) { this.retries = retries }
-
-  override def activateOptions() {
-    // check for config parameter validity
-    val props = new Properties()
-    if(brokerList != null)
-      props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList)
-    if(props.isEmpty)
-      throw new MissingConfigException("The bootstrap servers property should be specified")
-    if(topic == null)
-      throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
-    if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG,
compressionType)
-    if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG,
requiredNumAcks.toString)
-    if(retries > 0) props.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG,
retries.toString)
-    props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
-    props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
-    producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
-    LogLog.debug("Kafka producer connected to " +  brokerList)
-    LogLog.debug("Logging for topic: " + topic)
-  }
-
-  override def append(event: LoggingEvent)  {
-    val message = subAppend(event)
-    LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
-    val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes()))
-    if (syncSend) response.get
-  }
-
-  def subAppend(event: LoggingEvent): String = {
-    if(this.layout == null)
-      event.getRenderedMessage
-    else
-      this.layout.format(event)
-  }
-
-  override def close() {
-    if(!this.closed) {
-      this.closed = true
-      producer.close()
-    }
-  }
-
-  override def requiresLayout: Boolean = true
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
deleted file mode 100755
index 41366a1..0000000
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log4j
-
-import kafka.consumer.SimpleConsumer
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, CoreUtils, Logging}
-import kafka.api.FetchRequestBuilder
-import kafka.producer.async.MissingConfigException
-import kafka.serializer.Encoder
-import kafka.zk.ZooKeeperTestHarness
-
-import java.util.Properties
-import java.io.File
-
-import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.{PropertyConfigurator, Logger}
-import org.junit.{After, Before, Test}
-import org.scalatest.junit.JUnit3Suite
-
-import junit.framework.Assert._
-
-class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
-
-  var logDirZk: File = null
-  var config: KafkaConfig = null
-  var server: KafkaServer = null
-
-  var simpleConsumerZk: SimpleConsumer = null
-
-  val tLogger = Logger.getLogger(getClass())
-
-  private val brokerZk = 0
-
-  @Before
-  override def setUp() {
-    super.setUp()
-
-    val propsZk = TestUtils.createBrokerConfig(brokerZk, zkConnect)
-    val logDirZkPath = propsZk.getProperty("log.dir")
-    logDirZk = new File(logDirZkPath)
-    config = KafkaConfig.fromProps(propsZk)
-    server = TestUtils.createServer(config)
-    simpleConsumerZk = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64 *
1024, "")
-  }
-
-  @After
-  override def tearDown() {
-    simpleConsumerZk.close
-    server.shutdown
-    CoreUtils.rm(logDirZk)
-    super.tearDown()
-  }
-
-  @Test
-  def testKafkaLog4jConfigs() {
-    // host missing
-    var props = new Properties()
-    props.put("log4j.rootLogger", "INFO")
-    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
-    try {
-      PropertyConfigurator.configure(props)
-      fail("Missing properties exception was expected !")
-    } catch {
-      case e: MissingConfigException =>
-    }
-
-    // topic missing
-    props = new Properties()
-    props.put("log4j.rootLogger", "INFO")
-    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromServers(Seq(server)))
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-
-    try {
-      PropertyConfigurator.configure(props)
-      fail("Missing properties exception was expected !")
-    } catch {
-      case e: MissingConfigException =>
-    }
-  }
-
-  @Test
-  def testLog4jAppends() {
-    PropertyConfigurator.configure(getLog4jConfig)
-
-    for(i <- 1 to 5)
-      info("test")
-
-    val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic",
0, 0L, 1024*1024).build())
-    val fetchMessage = response.messageSet("test-topic", 0)
-
-    var count = 0
-    for(message <- fetchMessage) {
-      count = count + 1
-    }
-
-    assertEquals(5, count)
-  }
-
-  private def getLog4jConfig: Properties = {
-    val props = new Properties()
-    props.put("log4j.rootLogger", "INFO")
-    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromServers(Seq(server)))
-    props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.appender.KAFKA.RequiredNumAcks", "1")
-    props.put("log4j.appender.KAFKA.SyncSend", "true")
-    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
-    props
-  }
-}
-
-class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] {
-  def toBytes(event: LoggingEvent): Array[Byte] = {
-    event.getMessage.toString.getBytes(encoding)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
new file mode 100644
index 0000000..628ff53
--- /dev/null
+++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.log4jappender;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * A log4j appender that produces log messages to Kafka
+ */
+public class KafkaLog4jAppender extends AppenderSkeleton {
+
+    private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+    private static final String COMPRESSION_TYPE_CONFIG = "compression.type";
+    private static final String ACKS_CONFIG = "acks";
+    private static final String RETRIES_CONFIG = "retries";
+    private static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
+    private static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
+
+    private String brokerList = null;
+    private String topic = null;
+    private String compressionType = null;
+
+    private int retries = 0;
+    private int requiredNumAcks = Integer.MAX_VALUE;
+    private boolean syncSend = false;
+    private Producer<byte[], byte[]> producer = null;
+
+    public Producer<byte[], byte[]> getProducer() {
+        return producer;
+    }
+
+    public String getBrokerList() {
+        return brokerList;
+    }
+
+    public void setBrokerList(String brokerList) {
+        this.brokerList = brokerList;
+    }
+
+    public int getRequiredNumAcks() {
+        return requiredNumAcks;
+    }
+
+    public void setRequiredNumAcks(int requiredNumAcks) {
+        this.requiredNumAcks = requiredNumAcks;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+
+    public String getCompressionType() {
+        return compressionType;
+    }
+
+    public void setCompressionType(String compressionType) {
+        this.compressionType = compressionType;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public boolean getSyncSend() {
+        return syncSend;
+    }
+
+    public void setSyncSend(boolean syncSend) {
+        this.syncSend = syncSend;
+    }
+
+    @Override
+    public void activateOptions() {
+        // check for config parameter validity
+        Properties props = new Properties();
+        if (brokerList != null)
+            props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        if (props.isEmpty())
+            throw new ConfigException("The bootstrap servers property should be specified");
+        if (topic == null)
+            throw new ConfigException("Topic must be specified by the Kafka log4j appender");
+        if (compressionType != null)
+            props.put(COMPRESSION_TYPE_CONFIG, compressionType);
+        if (requiredNumAcks != Integer.MAX_VALUE)
+            props.put(ACKS_CONFIG, requiredNumAcks);
+        if (retries > 0)
+            props.put(RETRIES_CONFIG, retries);
+
+        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        this.producer = getKafkaProducer(props);
+        LogLog.debug("Kafka producer connected to " + brokerList);
+        LogLog.debug("Logging for topic: " + topic);
+    }
+
+    protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
+        return new KafkaProducer<byte[], byte[]>(props);
+    }
+
+    @Override
+    protected void append(LoggingEvent event) {
+        String message = subAppend(event);
+        LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
+        Future<RecordMetadata> response = producer.send(new ProducerRecord<byte[],
byte[]>(topic, message.getBytes()));
+        if (syncSend) {
+            try {
+                response.get();
+            } catch (InterruptedException ex) {
+                throw new RuntimeException(ex);
+            } catch (ExecutionException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+    }
+
+    private String subAppend(LoggingEvent event) {
+        return (this.layout == null) ? event.getRenderedMessage() : this.layout.format(event);
+    }
+
+    @Override
+    public void close() {
+        if (!this.closed) {
+            this.closed = true;
+            producer.close();
+        }
+    }
+
+    @Override
+    public boolean requiresLayout() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
new file mode 100644
index 0000000..71bdd94
--- /dev/null
+++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.log4jappender;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+
+public class KafkaLog4jAppenderTest {
+
+    Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
+
+    @Test
+    public void testKafkaLog4jConfigs() {
+        // host missing
+        Properties props = new Properties();
+        props.put("log4j.rootLogger", "INFO");
+        props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
+        props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
+        props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
+        props.put("log4j.appender.KAFKA.Topic", "test-topic");
+        props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+
+        try {
+            PropertyConfigurator.configure(props);
+            Assert.fail("Missing properties exception was expected !");
+        } catch (ConfigException ex) {
+            // It's OK!
+        }
+
+        // topic missing
+        props = new Properties();
+        props.put("log4j.rootLogger", "INFO");
+        props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
+        props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
+        props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
+        props.put("log4j.appender.KAFKA.brokerList", "127.0.0.1:9093");
+        props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+
+        try {
+            PropertyConfigurator.configure(props);
+            Assert.fail("Missing properties exception was expected !");
+        } catch (ConfigException ex) {
+            // It's OK!
+        }
+    }
+
+
+    @Test
+    public void testLog4jAppends() throws UnsupportedEncodingException {
+        PropertyConfigurator.configure(getLog4jConfig());
+
+        for (int i = 1; i <= 5; ++i) {
+            logger.error(getMessage(i));
+        }
+
+        Assert.assertEquals(
+                5, ((MockKafkaLog4jAppender) (logger.getRootLogger().getAppender("KAFKA"))).getHistory().size());
+    }
+
+    private byte[] getMessage(int i) throws UnsupportedEncodingException {
+        return ("test_" + i).getBytes("UTF-8");
+    }
+
+    private Properties getLog4jConfig() {
+        Properties props = new Properties();
+        props.put("log4j.rootLogger", "INFO, KAFKA");
+        props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.MockKafkaLog4jAppender");
+        props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
+        props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
+        props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093");
+        props.put("log4j.appender.KAFKA.Topic", "test-topic");
+        props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+        props.put("log4j.appender.KAFKA.SyncSend", "false");
+        props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
+        return props;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
new file mode 100644
index 0000000..c35f26a
--- /dev/null
+++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.log4jappender;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.test.MockSerializer;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.Properties;
+
+public class MockKafkaLog4jAppender extends KafkaLog4jAppender {
+    private MockProducer<byte[], byte[]> mockProducer =
+            new MockProducer<byte[], byte[]>(false, new MockSerializer(), new MockSerializer());
+
+    @Override
+    protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
+        return mockProducer;
+    }
+
+    @Override
+    protected void append(LoggingEvent event) {
+        if (super.getProducer() == null) {
+            activateOptions();
+        }
+        super.append(event);
+    }
+
+    protected java.util.List<ProducerRecord<byte[], byte[]>> getHistory() {
+        return mockProducer.history();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d96da05/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 83f764e..3b6a952 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,4 +14,4 @@
 // limitations under the License.
 
 apply from: file('scala.gradle')
-include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients'
+include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients',
'log4j-appender'


Mime
View raw message