kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7834: Extend collected logs in system test services to include heap dumps
Date Wed, 20 Feb 2019 02:18:28 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 90fb79b  KAFKA-7834: Extend collected logs in system test services to include heap
dumps
90fb79b is described below

commit 90fb79b4c17b242b83288bcabe06466347f5141f
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Mon Feb 4 16:46:03 2019 -0800

    KAFKA-7834: Extend collected logs in system test services to include heap dumps
    
    * Enable heap dumps on OOM with -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=<file.bin>
in the major services in system tests
    * Collect the heap dump from the predefined location as part of the result logs for each
service
    * Change Connect service to delete the whole root directory instead of individual expected
files
    * Tested by running the full suite of system tests
    
    Author: Konstantine Karantasis <konstantine@confluent.io>
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #6158 from kkonstantine/KAFKA-7834
---
 tests/kafkatest/services/connect.py     | 26 ++++++++++++++++++++++----
 tests/kafkatest/services/kafka/kafka.py | 11 +++++++++--
 tests/kafkatest/services/zookeeper.py   | 10 ++++++++--
 3 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index bf38e50..40c2cf3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -42,6 +42,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
     EXTERNAL_CONFIGS_FILE = os.path.join(PERSISTENT_ROOT, "connect-external-configs.properties")
     CONNECT_REST_PORT = 8083
+    HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin")
 
     # Currently the Connect worker supports waiting on three modes:
     STARTUP_MODE_INSTANT = 'INSTANT'
@@ -61,6 +62,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         "connect_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "connect_heap_dump_file": {
+            "path": HEAP_DUMP_FILE,
+            "collect_default": True}
     }
 
     def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
@@ -160,8 +164,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def clean_node(self, node):
         node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
         self.security_config.clean_node(node)
-        all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE,
self.STDOUT_FILE, self.STDERR_FILE, self.EXTERNAL_CONFIGS_FILE] + self.config_filenames()
+ self.files)
-        node.account.ssh("rm -rf " + all_files, allow_fail=False)
+        other_files = " ".join(self.config_filenames() + self.files)
+        node.account.ssh("rm -rf -- %s %s" % (ConnectServiceBase.PERSISTENT_ROOT, other_files),
allow_fail=False)
 
     def config_filenames(self):
         return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties")
for idx, template in enumerate(self.connector_config_templates or [])]
@@ -252,6 +256,14 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def _base_url(self, node):
         return 'http://' + node.account.externally_routable_ip + ':' + str(self.CONNECT_REST_PORT)
 
+    def append_to_environment_variable(self, envvar, value):
+        env_opts = self.environment[envvar]
+        if env_opts is None:
+            env_opts = "\"%s\"" % value
+        else:
+            env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
+        self.environment[envvar] = env_opts
+
 
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
@@ -266,7 +278,10 @@ class ConnectStandaloneService(ConnectServiceBase):
 
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
-        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
+                          self.logs["connect_heap_dump_file"]["path"]
+        other_kafka_opts = self.security_config.kafka_opts.strip('\"')
+        cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
         cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
@@ -314,7 +329,10 @@ class ConnectDistributedService(ConnectServiceBase):
     # connector_configs argument is intentionally ignored in distributed service.
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
-        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
+                          self.logs["connect_heap_dump_file"]["path"]
+        other_kafka_opts = self.security_config.kafka_opts.strip('\"')
+        cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
         cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 2258e27..a59bb71 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -49,6 +49,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
     # Kafka Authorizer
     SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
+    HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
 
     logs = {
         "kafka_server_start_stdout_stderr": {
@@ -65,7 +66,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             "collect_default": False},
         "kafka_data_2": {
             "path": DATA_LOG_DIR_2,
-            "collect_default": False}
+            "collect_default": False},
+        "kafka_heap_dump_file": {
+            "path": HEAP_DUMP_FILE,
+            "collect_default": True}
     }
 
     def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
@@ -247,7 +251,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def start_cmd(self, node):
         cmd = "export JMX_PORT=%d; " % self.jmx_port
         cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG
-        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
+                          self.logs["kafka_heap_dump_file"]["path"]
+        other_kafka_opts = self.security_config.kafka_opts.strip('\"')
+        cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
         cmd += "%s %s 1>> %s 2>> %s &" % \
                (self.path.script("kafka-server-start.sh", node),
                 KafkaService.CONFIG_FILE,
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 5bda867..f6a6b02 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -30,6 +30,7 @@ from kafkatest.version import DEV_BRANCH
 class ZookeeperService(KafkaPathResolverMixin, Service):
     ROOT = "/mnt/zookeeper"
     DATA = os.path.join(ROOT, "data")
+    HEAP_DUMP_FILE = os.path.join(ROOT, "zk_heap_dump.bin")
 
     logs = {
         "zk_log": {
@@ -37,7 +38,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
             "collect_default": True},
         "zk_data": {
             "path": DATA,
-            "collect_default": False}
+            "collect_default": False},
+        "zk_heap_dump_file": {
+            "path": HEAP_DUMP_FILE,
+            "collect_default": True}
     }
 
     def __init__(self, context, num_nodes, zk_sasl = False):
@@ -76,8 +80,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         self.logger.info(config_file)
         node.account.create_file("%s/zookeeper.properties" % ZookeeperService.ROOT, config_file)
 
-        start_cmd = "export KAFKA_OPTS=\"%s\";" % (self.kafka_opts + ' ' + self.security_system_properties)
\
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % self.logs["zk_heap_dump_file"]["path"]
+        other_kafka_opts = self.kafka_opts + ' ' + self.security_system_properties \
             if self.security_config.zk_sasl else self.kafka_opts
+        start_cmd = "export KAFKA_OPTS=\"%s %s\";" % (heap_kafka_opts, other_kafka_opts)
         start_cmd += "%s " % self.path.script("zookeeper-server-start.sh", node)
         start_cmd += "%s/zookeeper.properties &>> %s &" % (ZookeeperService.ROOT,
self.logs["zk_log"]["path"])
         node.account.ssh(start_cmd)


Mime
View raw message