kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Extend mirror maker test to include interceptors
Date Fri, 11 Nov 2016 17:10:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0fc1898bf -> 978393446


MINOR: Extend mirror maker test to include interceptors

Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2081 from kkonstantine/MINOR-Extend-mirror-maker-test-to-include-interceptors


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97839344
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97839344
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97839344

Branch: refs/heads/trunk
Commit: 97839344601cc67256e871bd461e877eed974104
Parents: 0fc1898
Author: Konstantine Karantasis <konstantine@confluent.io>
Authored: Fri Nov 11 09:10:20 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Nov 11 09:10:20 2016 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/connect.py                   | 13 +++++++++++--
 tests/kafkatest/services/kafka/kafka.py               |  1 +
 .../services/kafka/templates/log4j.properties         |  2 +-
 tests/kafkatest/services/mirror_maker.py              | 14 ++++++++++++--
 .../templates/mirror_maker_consumer.properties        |  8 ++++++++
 .../templates/mirror_maker_producer.properties        |  4 ++++
 6 files changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index bd2c9b9..091eaf7 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -57,6 +57,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()
         self.files = files
+        self.environment = {}
 
     def pids(self, node):
         """Return process ids for Kafka Connect processes."""
@@ -208,6 +209,8 @@ class ConnectStandaloneService(ConnectServiceBase):
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        for envvar in self.environment:
+            cmd += "export %s=%s; " % envvar, str(self.environment[envvar])
         cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
         cmd += " ".join(connector_configs)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE,
self.STDERR_FILE, self.PID_FILE)
@@ -227,7 +230,9 @@ class ConnectStandaloneService(ConnectServiceBase):
 
         self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
         with node.account.monitor_log(self.LOG_FILE) as monitor:
-            node.account.ssh(self.start_cmd(node, remote_connector_configs))
+            cmd = self.start_cmd(node, remote_connector_configs)
+            self.logger.debug("Connect standalone command: %s", cmd)
+            node.account.ssh(cmd)
             monitor.wait_until('Kafka Connect started', timeout_sec=30, err_msg="Never saw
message indicating Kafka Connect finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
@@ -247,6 +252,8 @@ class ConnectDistributedService(ConnectServiceBase):
     def start_cmd(self, node):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        for envvar in self.environment:
+            cmd += "export %s=%s; " % envvar, str(self.environment[envvar])
         cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE,
self.STDERR_FILE, self.PID_FILE)
         return cmd
@@ -262,7 +269,9 @@ class ConnectDistributedService(ConnectServiceBase):
 
         self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
         with node.account.monitor_log(self.LOG_FILE) as monitor:
-            node.account.ssh(self.start_cmd(node))
+            cmd = self.start_cmd(node)
+            self.logger.debug("Connect distributed command: %s", cmd)
+            node.account.ssh(cmd)
             monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw
message indicating Kafka Connect finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 2fe169b..4ac53b1 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -88,6 +88,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.authorizer_class_name = authorizer_class_name
         self.zk_set_acl = False
         self.server_prop_overides = server_prop_overides
+        self.log_level = "DEBUG"
 
         #
         # In a heavily loaded and not very fast machine, it is

http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/kafka/templates/log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/log4j.properties b/tests/kafkatest/services/kafka/templates/log4j.properties
index 8098f3b..7a2a8dc 100644
--- a/tests/kafkatest/services/kafka/templates/log4j.properties
+++ b/tests/kafkatest/services/kafka/templates/log4j.properties
@@ -111,7 +111,7 @@ log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaInfoAppender, kafkaDebugAppend
 log4j.logger.kafka.perf=DEBUG, kafkaInfoAppender, kafkaDebugAppender
 log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaInfoAppender, kafkaDebugAppender
 log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG, kafkaInfoAppender, kafkaDebugAppender
-log4j.logger.kafka=DEBUG, kafkaInfoAppender, kafkaDebugAppender
+log4j.logger.kafka={{ log_level|default("DEBUG") }}, kafkaInfoAppender, kafkaDebugAppender
 
 log4j.logger.kafka.network.RequestChannel$=DEBUG, requestInfoAppender, requestDebugAppender
 log4j.additivity.kafka.network.RequestChannel$=false

http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 7c0601f..734452c 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -73,7 +73,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
 
     def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None,
num_streams=1,
                  new_consumer=False, consumer_timeout_ms=None, offsets_storage="kafka",
-                 offset_commit_interval_ms=60000):
+                 offset_commit_interval_ms=60000, log_level="DEBUG", producer_interceptor_classes=None):
         """
         MirrorMaker mirrors messages from one or more source clusters to a single destination
cluster.
 
@@ -91,7 +91,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
             offset_commit_interval_ms:  how frequently the mirror maker consumer commits
offsets
         """
         super(MirrorMaker, self).__init__(context, num_nodes=num_nodes)
-        self.log_level = "DEBUG"
+        self.log_level = log_level
         self.new_consumer = new_consumer
         self.consumer_timeout_ms = consumer_timeout_ms
         self.num_streams = num_streams
@@ -108,11 +108,21 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
             raise Exception("offsets_storage should be 'kafka' or 'zookeeper'. Instead found
%s" % self.offsets_storage)
 
         self.offset_commit_interval_ms = offset_commit_interval_ms
+        self.producer_interceptor_classes = producer_interceptor_classes
+        self.external_jars = None
+
+        # These properties are potentially used by third-party tests.
+        self.source_auto_offset_reset = None
+        self.partition_assignment_strategy = None
 
     def start_cmd(self, node):
         cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+        # add external dependencies, for instance for interceptors
+        if self.external_jars is not None:
+            cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % self.external_jars
+            cmd += "export CLASSPATH; "
         cmd += " %s kafka.tools.MirrorMaker" % self.path.script("kafka-run-class.sh", node)
         cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
         cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG

http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/templates/mirror_maker_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/mirror_maker_consumer.properties b/tests/kafkatest/services/templates/mirror_maker_consumer.properties
index 68641ab..0e5b472 100644
--- a/tests/kafkatest/services/templates/mirror_maker_consumer.properties
+++ b/tests/kafkatest/services/templates/mirror_maker_consumer.properties
@@ -21,9 +21,17 @@ zookeeper.connect={{ source.zk.connect_setting() }}
 zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }}
 {% endif %}
 
+{% if source_auto_offset_reset is defined and source_auto_offset_reset is not none %}
+auto.offset.reset={{ source_auto_offset_reset|default('latest') }}
+{% endif %}
+
 group.id={{ group_id|default('test-consumer-group') }}
 offsets.storage={{ offsets_storage }}
 
 {% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
 consumer.timeout.ms={{ consumer_timeout_ms }}
 {% endif %}
+
+{% if partition_assignment_strategy is defined and partition_assignment_strategy is not none
%}
+partition.assignment.strategy={{ partition_assignment_strategy }}
+{% endif %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/97839344/tests/kafkatest/services/templates/mirror_maker_producer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/mirror_maker_producer.properties b/tests/kafkatest/services/templates/mirror_maker_producer.properties
index ff50b50..01cb75f 100644
--- a/tests/kafkatest/services/templates/mirror_maker_producer.properties
+++ b/tests/kafkatest/services/templates/mirror_maker_producer.properties
@@ -19,3 +19,7 @@ bootstrap.servers = {{ target.bootstrap_servers(security_config.security_protoco
 producer.type={{ producer_type|default("async") }}  # sync or async
 compression.codec=none
 serializer.class=kafka.serializer.DefaultEncoder
+
+{% if producer_interceptor_classes is defined and producer_interceptor_classes is not none
%}
+interceptor.classes={{ producer_interceptor_classes }}
+{% endif %}


Mime
View raw message