kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3078: Add ducktape tests for KafkaLog4jAppender producing to SASL enabled Kafka cluster
Date Tue, 12 Jan 2016 07:15:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2adeb214b -> 3e5afbfa0


KAFKA-3078: Add ducktape tests for KafkaLog4jAppender producing to SASL enabled Kafka cluster

Note that KAFKA-3077 will be required to run these tests.

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #747 from SinghAsDev/KAFKA-3078


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e5afbfa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e5afbfa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e5afbfa

Branch: refs/heads/trunk
Commit: 3e5afbfa0dd4ddfca65fae1f3b2a268ae1ed2025
Parents: 2adeb21
Author: Ashish Singh <asingh@cloudera.com>
Authored: Mon Jan 11 23:15:42 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jan 11 23:15:42 2016 -0800

----------------------------------------------------------------------
 .../kafkatest/services/kafka_log4j_appender.py  | 12 +++++--
 tests/kafkatest/tests/log4j_appender_test.py    | 17 ++++++---
 .../kafka/tools/VerifiableLog4jAppender.java    | 36 ++++++++++++++++++--
 3 files changed, 56 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index 0cc39c0..3732bb0 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -49,10 +49,18 @@ class KafkaLog4jAppender(BackgroundThreadService):
 
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
-        if self.security_protocol == SecurityConfig.SSL:
-            cmd += " --security-protocol SSL"
+        if self.security_protocol != SecurityConfig.PLAINTEXT:
+            cmd += " --security-protocol %s" % str(self.security_protocol)
+        if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL:
             cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH)
             cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password'])
+        if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \
+                self.security_protocol == SecurityConfig.SASL_SSL or \
+                self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \
+                self.security_protocol == SecurityConfig.SASL_MECHANISM_PLAIN:
+            cmd += " --sasl-kerberos-service-name %s" % str('kafka')
+            cmd += " --client-jaas-conf-path %s" % str(SecurityConfig.JAAS_CONF_PATH)
+            cmd += " --kerb5-conf-path %s" % str(SecurityConfig.KRB5CONF_PATH)
 
         cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log
&"
         return cmd

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tests/kafkatest/tests/log4j_appender_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/log4j_appender_test.py b/tests/kafkatest/tests/log4j_appender_test.py
index db33d76..42cfeea 100644
--- a/tests/kafkatest/tests/log4j_appender_test.py
+++ b/tests/kafkatest/tests/log4j_appender_test.py
@@ -35,6 +35,7 @@ class Log4jAppenderTest(Test):
         super(Log4jAppenderTest, self).__init__(test_context)
         self.num_zk = 1
         self.num_brokers = 1
+        self.messages_received_count = 0
         self.topics = {
             TOPIC: {'partitions': 1, 'replication-factor': 1}
         }
@@ -56,13 +57,20 @@ class Log4jAppenderTest(Test):
                                            security_protocol=security_protocol)
         self.appender.start()
 
+    def custom_message_validator(self, msg):
+        if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in msg:
+            self.logger.debug("Received message: %s" % msg)
+            self.messages_received_count += 1
+
+
     def start_consumer(self, security_protocol):
-        enable_new_consumer = security_protocol == SecurityConfig.SSL
+        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka,
topic=TOPIC,
-                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
+                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer,
+                                        message_validator=self.custom_message_validator)
         self.consumer.start()
 
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
     def test_log4j_appender(self, security_protocol='PLAINTEXT'):
         """
         Tests if KafkaLog4jAppender is producing to Kafka topic
@@ -79,8 +87,7 @@ class Log4jAppenderTest(Test):
             timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
 
         # Verify consumed messages count
-        expected_lines_count = MAX_MESSAGES * 2  # two times to account for new lines introduced
by log4j
-        wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count,
timeout_sec=10,
+        wait_until(lambda: self.messages_received_count == MAX_MESSAGES, timeout_sec=10,
                    err_msg="Timed out waiting to consume expected number of messages.")
 
         self.consumer.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e5afbfa/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
index a48b301..ffbf7dc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -21,6 +21,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 
@@ -96,7 +97,7 @@ public class VerifiableLog4jAppender {
             .required(false)
             .setDefault("PLAINTEXT")
             .type(String.class)
-            .choices("PLAINTEXT", "SSL")
+            .choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL")
             .metavar("SECURITY-PROTOCOL")
             .dest("securityProtocol")
             .help("Security protocol to be used while communicating with Kafka brokers.");
@@ -124,6 +125,30 @@ public class VerifiableLog4jAppender {
             .metavar("CONFIG_FILE")
             .help("Log4jAppender config properties file.");
 
+        parser.addArgument("--sasl-kerberos-service-name")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("SASL-KERBEROS-SERVICE-NAME")
+            .dest("saslKerberosServiceName")
+            .help("Name of sasl kerberos service.");
+
+        parser.addArgument("--client-jaas-conf-path")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("CLIENT-JAAS-CONF-PATH")
+            .dest("clientJaasConfPath")
+            .help("Path of JAAS config file of Kafka client.");
+
+        parser.addArgument("--kerb5-conf-path")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("KERB5-CONF-PATH")
+            .dest("kerb5ConfPath")
+            .help("Path of Kerb5 config file.");
+
         return parser;
     }
 
@@ -171,11 +196,18 @@ public class VerifiableLog4jAppender {
             props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks"));
             props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
             final String securityProtocol = res.getString("securityProtocol");
-            if (securityProtocol != null && securityProtocol.equals("SSL")) {
+            if (securityProtocol != null && !securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString()))
{
                 props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol);
+            }
+            if (securityProtocol != null && securityProtocol.contains("SSL")) {
                 props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation"));
                 props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword"));
             }
+            if (securityProtocol != null && securityProtocol.contains("SASL")) {
+                props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", res.getString("saslKerberosServiceName"));
+                props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", res.getString("clientJaasConfPath"));
+                props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", res.getString("kerb5ConfPath"));
+            }
             props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
 
             if (configFile != null) {


Mime
View raw message