kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2447: Add capability to KafkaLog4jAppender to be able to use SSL
Date Tue, 27 Oct 2015 15:45:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2fd645ac2 -> d21cb66e7


KAFKA-2447: Add capability to KafkaLog4jAppender to be able to use SSL

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Gwen Shapira, Ismael Juma

Closes #175 from SinghAsDev/KAFKA-2447


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d21cb66e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d21cb66e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d21cb66e

Branch: refs/heads/trunk
Commit: d21cb66e7d21ed3d20fc1e13b9a856f764bb4237
Parents: 2fd645a
Author: Ashish Singh <asingh@cloudera.com>
Authored: Tue Oct 27 08:45:27 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Oct 27 08:45:27 2015 -0700

----------------------------------------------------------------------
 .../kafka/log4jappender/KafkaLog4jAppender.java | 88 ++++++++++++++++++--
 1 file changed, 82 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d21cb66e/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
----------------------------------------------------------------------
diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index 2baef06..94120e2 100644
--- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.log4jappender;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
@@ -36,16 +39,28 @@ import java.util.concurrent.Future;
  */
 public class KafkaLog4jAppender extends AppenderSkeleton {
 
-    private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
-    private static final String COMPRESSION_TYPE_CONFIG = "compression.type";
-    private static final String ACKS_CONFIG = "acks";
-    private static final String RETRIES_CONFIG = "retries";
-    private static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
-    private static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
+    private static final String BOOTSTRAP_SERVERS_CONFIG = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+    private static final String COMPRESSION_TYPE_CONFIG = ProducerConfig.COMPRESSION_TYPE_CONFIG;
+    private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG;
+    private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG;
+    private static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+    private static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+    private static final String SECURITY_PROTOCOL = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+    private static final String SSL_TRUSTSTORE_LOCATION = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
+    private static final String SSL_TRUSTSTORE_PASSWORD = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
+    private static final String SSL_KEYSTORE_TYPE = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG;
+    private static final String SSL_KEYSTORE_LOCATION = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
+    private static final String SSL_KEYSTORE_PASSWORD = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
 
     private String brokerList = null;
     private String topic = null;
     private String compressionType = null;
+    private String securityProtocol = null;
+    private String sslTruststoreLocation = null;
+    private String sslTruststorePassword = null;
+    private String sslKeystoreType = null;
+    private String sslKeystoreLocation = null;
+    private String sslKeystorePassword = null;
 
     private int retries = 0;
     private int requiredNumAcks = Integer.MAX_VALUE;
@@ -104,6 +119,54 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
         this.syncSend = syncSend;
     }
 
+    public String getSslTruststorePassword() {
+        return sslTruststorePassword;
+    }
+
+    public String getSslTruststoreLocation() {
+        return sslTruststoreLocation;
+    }
+
+    public String getSecurityProtocol() {
+        return securityProtocol;
+    }
+
+    public void setSecurityProtocol(String securityProtocol) {
+        this.securityProtocol = securityProtocol;
+    }
+
+    public void setSslTruststoreLocation(String sslTruststoreLocation) {
+        this.sslTruststoreLocation = sslTruststoreLocation;
+    }
+
+    public void setSslTruststorePassword(String sslTruststorePassword) {
+        this.sslTruststorePassword = sslTruststorePassword;
+    }
+
+    public void setSslKeystorePassword(String sslKeystorePassword) {
+        this.sslKeystorePassword = sslKeystorePassword;
+    }
+
+    public void setSslKeystoreType(String sslKeystoreType) {
+        this.sslKeystoreType = sslKeystoreType;
+    }
+
+    public void setSslKeystoreLocation(String sslKeystoreLocation) {
+        this.sslKeystoreLocation = sslKeystoreLocation;
+    }
+
+    public String getSslKeystoreLocation() {
+        return sslKeystoreLocation;
+    }
+
+    public String getSslKeystoreType() {
+        return sslKeystoreType;
+    }
+
+    public String getSslKeystorePassword() {
+        return sslKeystorePassword;
+    }
+
     @Override
     public void activateOptions() {
         // check for config parameter validity
@@ -120,6 +183,19 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
             props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
         if (retries > 0)
             props.put(RETRIES_CONFIG, retries);
+        if (securityProtocol != null && sslTruststoreLocation != null &&
+            sslTruststorePassword != null) {
+            props.put(SECURITY_PROTOCOL, securityProtocol);
+            props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation);
+            props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword);
+
+            if (sslKeystoreType != null && sslKeystoreLocation != null &&
+                sslKeystorePassword != null) {
+                props.put(SSL_KEYSTORE_TYPE, sslKeystoreType);
+                props.put(SSL_KEYSTORE_LOCATION, sslKeystoreLocation);
+                props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword);
+            }
+        }
 
         props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");


Mime
View raw message