kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Added simple streams benchmark to system tests
Date Mon, 18 Jul 2016 19:17:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fbc518554 -> 61c568d83


MINOR: Added simple streams benchmark to system tests

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Geoff Anderson, Guozhang Wang, Ismael Juma

Closes #1621 from enothereska/simple-benchmark-streams-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/61c568d8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/61c568d8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/61c568d8

Branch: refs/heads/trunk
Commit: 61c568d8391de6fceb6c8d6a33d349def8d2ada8
Parents: fbc5185
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Mon Jul 18 12:16:58 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jul 18 12:16:58 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/perf/SimpleBenchmark.java     |  14 +-
 .../streams/streams_simple_benchmark_test.py    |  43 ++++++
 .../services/performance/streams_performance.py | 132 +++++++++++++++++++
 3 files changed, 184 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/61c568d8/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index a92fb1b..93bb571 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -74,15 +74,19 @@ public class SimpleBenchmark {
     }
 
     public static void main(String[] args) throws Exception {
-        final File stateDir = new File("/tmp/kafka-streams-simple-benchmark");
-        stateDir.mkdir();
+        String kafka = args.length > 0 ? args[0] : "localhost:9092";
+        String zookeeper = args.length > 1 ? args[1] : "localhost:2181";
+        String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark";
 
+        final File stateDir = new File(stateDirStr);
+        stateDir.mkdir();
         final File rocksdbDir = new File(stateDir, "rocksdb-test");
         rocksdbDir.mkdir();
 
-
-        final String kafka = "localhost:9092";
-        final String zookeeper = "localhost:2181";
+        System.out.println("SimpleBenchmark instance started");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
 
         SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/61c568d8/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
new file mode 100644
index 0000000..5eb2663
--- /dev/null
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import ignore
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
+import time
+
+class StreamsSimpleBenchmarkTest(KafkaTest):
+    """
+    Simple benchmark of Kafka Streams.
+    """
+
+    def __init__(self, test_context):
+        super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1)
+
+        self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka)
+
+    def test_simple_benchmark(self):
+        """
+        Run simple Kafka Streams benchmark
+        """
+
+        self.driver.start()
+        self.driver.wait()
+        self.driver.stop()
+        node = self.driver.node
+        node.account.ssh("grep Performance %s" % self.driver.STDOUT_FILE, allow_fail=False)
+
+        return self.driver.collect_data(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/61c568d8/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
new file mode 100644
index 0000000..8002bbe
--- /dev/null
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os.path
+import signal
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+
+#
+# Class used to start the simple Kafka Streams benchmark
+#
+class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
+    """Base class for simple Kafka Streams benchmark"""
+
+    PERSISTENT_ROOT = "/mnt/streams"
+    # The log file contains normal log4j logs written using a file appender. stdout and stderr
are handled separately
+    LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
+    STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
+    STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
+    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
+
+    logs = {
+        "streams_log": {
+            "path": LOG_FILE,
+            "collect_default": True},
+        "streams_stdout": {
+            "path": STDOUT_FILE,
+            "collect_default": True},
+        "streams_stderr": {
+            "path": STDERR_FILE,
+            "collect_default": True},
+    }
+
+    def __init__(self, context, kafka):
+        super(StreamsSimpleBenchmarkService, self).__init__(context, 1)
+        self.kafka = kafka
+
+    @property
+    def node(self):
+        return self.nodes[0]
+
+    def pids(self, node):
+        try:
+            return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
+        except:
+            return []
+
+    def stop_node(self, node, clean_shutdown=True):
+        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping SimpleBenchmark
on " + str(node.account))
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            node.account.signal(pid, sig, allow_fail=True)
+        if clean_shutdown:
+            for pid in pids:
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="SimpleBenchmark
process on " + str(node.account) + " took too long to exit")
+
+        node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
+
+    def wait(self):
+        for node in self.nodes:
+            for pid in self.pids(node):
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=600, err_msg="SimpleBenchmark
process on " + str(node.account) + " took too long to exit")
+
+    def clean_node(self, node):
+        node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+
+    def start_cmd(self, node):
+        args = {}
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['zk'] = self.kafka.zk.connect_setting()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.perf.SimpleBenchmark
" \
+              " %(kafka)s %(zk)s %(state_dir)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3>
%(pidfile)s" % args
+
+        return cmd
+
+    def start_node(self, node):
+        node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+
+        node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties',
log_file=self.LOG_FILE))
+
+        self.logger.info("Starting SimpleBenchmark process on " + str(node.account))
+        results = {}
+        with node.account.monitor_log(self.STDOUT_FILE) as monitor:
+            node.account.ssh(self.start_cmd(node))
+            monitor.wait_until('SimpleBenchmark instance started', timeout_sec=15, err_msg="Never
saw message indicating SimpleBenchmark finished startup on " + str(node.account))
+
+        if len(self.pids(node)) == 0:
+            raise RuntimeError("No process ids recorded")
+
+    def collect_data(self, node):
+        # Collect the data and return it to the framework
+        output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
+        data = []
+        data.append('{')
+        for line in output:
+            parts = line.split(':')
+            data.append('\'')
+            data.append(parts[0])
+            data.append('\'')
+            data.append(':')
+            data.append(parts[1]);
+        data.append('}')
+        data = ''.join(data)
+        return data


Mime
View raw message