kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1376147 [1/2] - in /incubator/kafka/branches/0.8: config/ system_test/ system_test/replication_testsuite/ system_test/replication_testsuite/config/ system_test/replication_testsuite/testcase_1/ system_test/utils/
Date Wed, 22 Aug 2012 17:16:27 GMT
Author: nehanarkhede
Date: Wed Aug 22 17:16:26 2012
New Revision: 1376147

URL: http://svn.apache.org/viewvc?rev=1376147&view=rev
Log:
KAFKA-440 Regression/system test framework; patched by John Fung; reviewed by Neha Narkhede

Added:
    incubator/kafka/branches/0.8/system_test/README.txt
    incubator/kafka/branches/0.8/system_test/__init__.py
    incubator/kafka/branches/0.8/system_test/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/__init__.py
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/console_consumer.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/consumer.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/log4j.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/zookeeper.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
    incubator/kafka/branches/0.8/system_test/system_test_env.py
    incubator/kafka/branches/0.8/system_test/system_test_runner.py
    incubator/kafka/branches/0.8/system_test/utils/
    incubator/kafka/branches/0.8/system_test/utils/__init__.py
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/setup_utils.py
    incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
Modified:
    incubator/kafka/branches/0.8/config/log4j.properties

Modified: incubator/kafka/branches/0.8/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/log4j.properties?rev=1376147&r1=1376146&r2=1376147&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/log4j.properties (original)
+++ incubator/kafka/branches/0.8/config/log4j.properties Wed Aug 22 17:16:26 2012
@@ -28,3 +28,5 @@ log4j.appender.stdout.layout.ConversionP
 #log4j.logger.kafka=INFO
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
 
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG 

Added: incubator/kafka/branches/0.8/system_test/README.txt
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/README.txt?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/README.txt (added)
+++ incubator/kafka/branches/0.8/system_test/README.txt Wed Aug 22 17:16:26 2012
@@ -0,0 +1,80 @@
+# ==========================
+# Known Issues:
+# ==========================
+1. The "broker-list" in system_test/replication_testsuite/testcase_1/testcase_1_properties.json needs to be manually updated to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
+2. Sometimes the running processes may not be terminated properly by the script.
+
+
+# ==========================
+# Overview
+# ==========================
+
+"system_test" is now transformed to a system regression test framework intended for the automation of system / integration testing of data platform software such as Kafka. The test framework is implemented in Python which is a popular scripting language with well supported features.
+
+The framework has the following levels:
+
+1. The first level is generic and does not depend on any product specific details.
+   location: system_test
+   a. system_test_runner.py - It implements the main class RegTest as an entry point.
+   b. system_test_env.py    - It implements the class RegTestEnv which defines the testing environment of a test session such as the base directory and environment variables specific to the local machine.
+
+2. The second level defines a suite of testing such as Kafka's replication (including basic testing, failure testing, ... etc)
+   location: system_test/<suite directory name>*.
+
+   * Please note the test framework will look for a specific suffix of the directories under system_test to determine what test suites are available. The suffix of <suite directory name> can be defined in SystemTestEnv class (system_test_env.py)
+
+   a. replica_basic_test.py - This is a test module file. It implements the test logic for basic replication testing as follows:
+
+       i.    start zookeepers
+       ii.   start brokers
+       iii.  create kafka topics
+       iv.   lookup the brokerid as a leader
+       v.    terminate the leader (if defined in the testcase config json file)
+       vi.   start producer to send n messages
+       vii.  start consumer to receive messages
+       viii. validate if there is data loss
+
+   b. config/ - This config directory provides templates for all properties files needed for zookeeper, brokers, producer and consumer (any changes in the files under this directory would be reflected or overwritten by the settings under testcase_<n>/testcase_<n>_properties.json)
+
+   d. testcase_<n>** - The testcase directory contains the testcase argument definition file: testcase_1_properties.json. This file defines the specific configurations for the testcase such as the followings (eg. producer related):
+      i.   no. of producer threads
+      ii.  no. of messages to produce
+      iii. zkconnect string
+      
+      When this test case is being run, the test framework will copy and update the template properties files to testcase_<n>/config. The logs of various components will be saved in testcase_<n>/logs
+
+   ** Please note the test framework will look for a specific prefix of the directories under system_test/<test suite dir>/ to determine what test cases are available. The prefix of <testcase directory name> can be defined in SystemTestEnv class (system_test_env.py)
+
+# ==========================
+# Quick Start
+# ==========================
+
+* Please note that the following commands should be executed after downloading the kafka source code to build all the required binaries:
+  1. <kafka install dir>/ $ ./sbt update package
+
+  Now you are ready to follow the steps below.
+  1. Update system_test/cluster_config.json for "kafka_home" & "java_home" specific to your environment
+  2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
+  3. To run the test, go to <kafka_home>/system_test and run the following command:
+     $ python -B system_test_runner.py 
+
+
+# ==========================
+# Adding Test Case
+# ==========================
+
+To create a new test suite called "broker_testsuite", please do the followings:
+
+  1. Copy and paste system_test/replication_testsuite => system_test/broker_testsuite
+  2. Rename system_test/broker_testsuite/replica_basic_test.py => system_test/broker_testsuite/broker_basic_test.py
+  3. Edit system_test/broker_testsuite/broker_basic_test.py and update all ReplicaBasicTest related class name to BrokerBasicTest (as an example)
+  4. Follow the flow of system_test/broker_testsuite/broker_basic_test.py and modify the necessary test logic accordingly.
+
+
+To create a new test case under "replication_testsuite", please do the followings:
+
+  1. Copy and paste system_test/replication_testsuite/testcase_1 => system_test/replication_testsuite/testcase_2
+  2. Rename system_test/replication_testsuite/testcase_2/testcase_1_properties.json => system_test/replication_testsuite/testcase_2/testcase_2_properties.json
+  3. Update system_test/replication_testsuite/testcase_2/testcase_2_properties.json with the corresponding settings for testcase 2.
+
+

Added: incubator/kafka/branches/0.8/system_test/__init__.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/__init__.py?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/__init__.py (added)
+++ incubator/kafka/branches/0.8/system_test/__init__.py Wed Aug 22 17:16:26 2012
@@ -0,0 +1 @@
+ 

Added: incubator/kafka/branches/0.8/system_test/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/cluster_config.json?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/cluster_config.json (added)
+++ incubator/kafka/branches/0.8/system_test/cluster_config.json Wed Aug 22 17:16:26 2012
@@ -0,0 +1,52 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "jmx_port": "9990"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "jmx_port": "9991"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "jmx_port": "9992"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "jmx_port": "9993"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "jmx_port": "9994"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "kafka_home": "/home/nnarkhed/Projects/kafka-440",
+            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "jmx_port": "9995"
+        }
+    ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/__init__.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/__init__.py?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/__init__.py (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/__init__.py Wed Aug 22 17:16:26 2012
@@ -0,0 +1 @@
+ 

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/console_consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/console_consumer.properties?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/console_consumer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/console_consumer.properties Wed Aug 22 17:16:26 2012
@@ -0,0 +1,4 @@
+zookeeper=local:2181
+topic=test_1
+from-beginning
+consumer-timeout-ms=10000

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/consumer.properties?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/consumer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/consumer.properties Wed Aug 22 17:16:26 2012
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=127.0.0.1:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=test-consumer-group
+
+#consumer timeout
+#consumer.timeout.ms=5000

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/log4j.properties?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/log4j.properties (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/log4j.properties Wed Aug 22 17:16:26 2012
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+#log4j.appender.fileAppender=org.apache.log4j.FileAppender
+#log4j.appender.fileAppender.File=kafka-request.log
+#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+
+
+# Turn on all our debugging info
+#log4j.logger.kafka=INFO
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+
+# to print message checksum from ProducerPerformance
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+
+# to print message checksum from ProducerPerformance
+log4j.logger.kafka.perf=DEBUG
+log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer.properties?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer.properties (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer.properties Wed Aug 22 17:16:26 2012
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.producer.ProducerConfig for more details
+
+############################# Producer Basics #############################
+
+# need to set either broker.list or zk.connect
+
+# configure brokers statically
+# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
+#broker.list=0:localhost:9092
+
+# discover brokers from ZK
+zk.connect=localhost:2181
+
+# zookeeper session timeout; default is 6000
+#zk.sessiontimeout.ms=
+
+# the max time that the client waits to establish a connection to zookeeper; default is 6000
+#zk.connectiontimeout.ms
+
+# name of the partitioner class for partitioning events; default partition spreads data randomly
+#partitioner.class=
+
+# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
+producer.type=sync
+
+# specify the compression codec for all data generated: 0: no compression, 1: gzip
+compression.codec=0
+
+# message encoder
+serializer.class=kafka.serializer.StringEncoder
+
+# allow topic level compression
+#compressed.topics=
+
+# max message size; messages larger than that size are discarded; default is 1000000
+#max.message.size=
+
+
+############################# Async Producer #############################
+# maximum time, in milliseconds, for buffering data on the producer queue 
+#queue.time=
+
+# the maximum size of the blocking queue for buffering on the producer 
+#queue.size=
+
+# Timeout for event enqueue:
+# 0: events will be enqueued immediately or dropped if the queue is full
+# -ve: enqueue will block indefinitely if the queue is full
+# +ve: enqueue will block up to this many milliseconds if the queue is full
+#queue.enqueueTimeout.ms=
+
+# the number of messages batched at the producer 
+#batch.size=
+
+# the callback handler for one or multiple events 
+#callback.handler=
+
+# properties required to initialize the callback handler 
+#callback.handler.props=
+
+# the handler for events 
+#event.handler=
+
+# properties required to initialize the event handler 
+#event.handler.props=
+

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties Wed Aug 22 17:16:26 2012
@@ -0,0 +1,7 @@
+broker-list=localhost:2181
+topic=mytest
+messages=200
+message-size=100
+thread=5
+initial-message-id=0
+compression-codec=0

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/server.properties Wed Aug 22 17:16:26 2012
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#hostname=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+network.threads=2
+ 
+# The number of threads doing disk I/O
+io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_logs
+
+# The number of logical partitions per topic per server. More partitions allow greater parallelism
+# for consumption, but also mean more files.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.file.size=536870912
+#log.file.size=102400
+log.file.size=128
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/zookeeper.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/zookeeper.properties?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/zookeeper.properties (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/zookeeper.properties Wed Aug 22 17:16:26 2012
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Wed Aug 22 17:16:26 2012
@@ -0,0 +1,256 @@
+#!/usr/bin/env python
+
+# ===================================
+# replica_basic_test.py
+# ===================================
+
+import inspect
+import logging
+import os
+import signal
+import subprocess
+import sys
+import time
+
+from   system_test_env    import SystemTestEnv
+sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
+from   setup_utils        import SetupUtils
+import system_test_utils
+from   testcase_env       import TestcaseEnv
+
+# product specific: Kafka
+import kafka_system_test_utils
+
+class ReplicaBasicTest(SetupUtils):
+
+    testModuleAbsPathName = os.path.realpath(__file__)
+    testSuiteAbsPathName  = os.path.abspath(os.path.dirname(testModuleAbsPathName))
+    isLeaderLogPattern    = "Completed the leader state transition"
+
+    def __init__(self, systemTestEnv):
+
+        # SystemTestEnv - provides cluster level environment settings
+        #     such as entity_id, hostname, kafka_home, java_home which
+        #     are available in a list of dictionary named 
+        #     "clusterEntityConfigDictList"
+        self.systemTestEnv = systemTestEnv
+
+        # dict to pass user-defined attributes to logger argument: "extra"
+        d = {'name_of_class': self.__class__.__name__}
+
+    def signal_handler(self, signal, frame):
+        self.log_message("Interrupt detected - User pressed Ctrl+c")
+
+        for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
+            kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, self.testcaseEnv, entityId, parentPid)
+
+        sys.exit(1) 
+
+    def runTest(self):
+
+        # get all testcase directories under this testsuite
+        testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
+            self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
+        testCasePathNameList.sort()
+
+        # =============================================================
+        # launch each testcase one by one: testcase_1, testcase_2, ...
+        # =============================================================
+        for testCasePathName in testCasePathNameList:
+   
+            try: 
+                # create a new instance of TestcaseEnv to keep track of this testcase's environment variables
+                self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
+                self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
+    
+                # initialize self.testcaseEnv with user-defined environment
+                self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = \
+                    ReplicaBasicTest.isLeaderLogPattern
+                self.testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"]  = \
+                    "\[(.*?)\] .* Broker (.*?): " + \
+                    self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + \
+                    " for topic (.*?) partition (.*?) \(.*"
+                self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
+    
+                # find testcase properties json file
+                testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName)
+                self.logger.debug("testcasePropJsonPathName : " + testcasePropJsonPathName, extra=self.d)
+    
+                # get the dictionary that contains the testcase arguments and description
+                testcaseNonEntityDataDict = system_test_utils.get_json_dict_data(testcasePropJsonPathName)
+    
+                testcaseDirName = os.path.basename(testCasePathName)
+                self.testcaseEnv.testcaseResultsDict["test_case_name"] = testcaseDirName
+    
+                #### => update testcaseEnv
+                self.testcaseEnv.testCaseBaseDir = testCasePathName
+                self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
+    
+                # get testcase description
+                testcaseDescription = ""
+                for k,v in testcaseNonEntityDataDict.items():
+                    if ( k == "description" ): testcaseDescription = v
+    
+                #### => update testcaseEnv
+                # TestcaseEnv.testcaseArgumentsDict initialized, this dictionary keeps track of the
+                # "testcase_args" in the testcase_properties.json such as replica_factor, num_partition, ...
+                self.testcaseEnv.testcaseArgumentsDict = testcaseNonEntityDataDict["testcase_args"]
+    
+                # =================================================================
+                # TestcaseEnv environment settings initialization are completed here
+                # =================================================================
+                # self.testcaseEnv.systemTestBaseDir
+                # self.testcaseEnv.testSuiteBaseDir
+                # self.testcaseEnv.testCaseBaseDir
+                # self.testcaseEnv.testCaseLogsDir
+                # self.testcaseEnv.testcaseArgumentsDict
+    
+                # display testcase name and arguments
+                self.log_message("Test Case : " + testcaseDirName)
+                for k,v in self.testcaseEnv.testcaseArgumentsDict.items():
+                    self.anonLogger.info("    " + k + " : " + v)
+                self.log_message("Description : " + testcaseDescription)
+    
+    
+                # ================================================================ #
+                # ================================================================ #
+                #            Product Specific Testing Code Starts Here:            #
+                # ================================================================ #
+                # ================================================================ #
+    
+                # initialize signal handler
+                signal.signal(signal.SIGINT, self.signal_handler)
+    
+                # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase
+                # for collecting logs from remote machines
+                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+    
+                # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
+                #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(testcasePropJsonPathName)
+    
+                # TestcaseEnv - initialize producer & consumer config / log file pathnames
+                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+    
+                # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
+                kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+    
+                # generate remote hosts log/config dirs if not exist
+                kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+    
+                # generate properties files for zookeeper, kafka, producer, consumer:
+                # 1. copy system_test/<suite_name>_testsuite/config/*.properties to 
+                #    system_test/<suite_name>_testsuite/testcase_<n>/config/
+                # 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
+                #    by overriding the settings specified in:
+                #    system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName, self.testcaseEnv, self.systemTestEnv)
+    
+                # =============================================
+                # preparing all entities to start the test
+                # =============================================
+                self.log_message("starting zookeepers")
+                kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 2s")
+                time.sleep(2)
+    
+                self.log_message("starting brokers")
+                kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+    
+                self.log_message("creating topics")
+                kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+    
+                self.log_message("looking up leader")
+                leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, self.testcaseEnv)
+    
+                # ==========================
+                # leaderDict looks like this:
+                # ==========================
+                #{'entity_id': u'3',
+                # 'partition': '0',
+                # 'timestamp': 1345050255.8280001,
+                # 'hostname': u'localhost',
+                # 'topic': 'test_1',
+                # 'brokerid': '3'}
+    
+                # validate to see if leader election is successful
+                self.log_message("validating leader election")
+                result = kafka_system_test_utils.validate_leader_election_successful( \
+                             self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
+    
+                # checking to see if leader bouncing is required in this testcase
+                bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
+                self.log_message("bounce_leader flag : " + bounceLeaderFlag)
+    
+                if (bounceLeaderFlag.lower() == "true"):
+                    if self.testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
+                        # no leader available for testing => skip this round
+                        self.log_message("stopping all entities")
+                        for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
+                            kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+    
+                        continue
+                    else:
+                        # leader elected => stop leader
+                        try:
+                            leaderEntityId = leaderDict["entity_id"]
+                            leaderBrokerId = leaderDict["brokerid"]
+                            leaderPPid     = self.testcaseEnv.entityParentPidDict[leaderEntityId]
+                        except:
+                            self.log_message("leader details unavailable")
+    
+                        self.log_message("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid)
+                 
+                        kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, leaderPPid)
+                        self.testcaseEnv.entityParentPidDict[leaderEntityId] = ""
+    
+                    self.logger.info("sleeping for 5s for leader re-election to complete", extra=self.d)
+                    time.sleep(5)
+    
+                # starting producer 
+                self.log_message("starting producer")
+                kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+    
+                # starting previously terminated broker 
+                if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
+                    self.log_message("starting the previously terminated broker")
+    
+                    stoppedLeaderEntityId = leaderDict["entity_id"]
+                    kafka_system_test_utils.start_entity_in_background(
+                        self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
+    
+                    self.anonLogger.info("sleeping for 5s")
+                    time.sleep(5)
+
+                # starting consumer
+                self.log_message("starting consumer")
+                kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
+    
+                # this testcase is completed - so stopping all entities
+                self.log_message("stopping all entities")
+                for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+    
+                # validate the data matched
+                self.log_message("validating data matched")
+                result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
+    
+                # =============================================
+                # collect logs from remote hosts
+                # =============================================
+                kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+    
+            except Exception as e:
+                self.log_message("Exception caught : ")
+                print e
+                self.log_message("stopping all entities")
+                for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+ 

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json Wed Aug 22 17:16:26 2012
@@ -0,0 +1,63 @@
+{
+  "description": "Basic test to produce and consume messages to a single topic partition. This test sends messages to n replicas and at the end verifies the log size and contents as well as using a consumer to verify no message loss. Optionally, the test bounces the leader periodically to introduce failures during the message replication.",
+  "testcase_args": {
+    "bounce_leader": "true",
+    "replica_factor": "3",
+    "num_partition": "2"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "10240",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "500",
+      "broker-list": "localhost:9091,localhost:9092,localhost:9093",
+      "log_filename": "producer_performance.log",
+      "config_filename": "producer_performance.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_1",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer.properties"
+    }
+  ]
+}

Added: incubator/kafka/branches/0.8/system_test/system_test_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/system_test_env.py?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/system_test_env.py (added)
+++ incubator/kafka/branches/0.8/system_test/system_test_env.py Wed Aug 22 17:16:26 2012
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+
+# ===================================
+# system_test_env.py
+# ===================================
+
+import json
+import os
+import sys
+
+class SystemTestEnv():
+
+    # private:
+    _cwdFullPath              = os.getcwd()
+    _thisScriptFullPathName   = os.path.realpath(__file__)
+    _thisScriptBaseDir        = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0])))
+
+    # public:
+    SYSTEM_TEST_BASE_DIR      = os.path.abspath(_thisScriptBaseDir)
+    SYSTEM_TEST_UTIL_DIR      = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/utils")
+    SYSTEM_TEST_SUITE_SUFFIX  = "_testsuite"
+    SYSTEM_TEST_CASE_PREFIX   = "testcase_"
+    SYSTEM_TEST_MODULE_EXT    = ".py"
+    CLUSTER_CONFIG_FILENAME   = "cluster_config.json"
+    CLUSTER_CONFIG_PATHNAME   = os.path.abspath(SYSTEM_TEST_BASE_DIR + "/" + CLUSTER_CONFIG_FILENAME)
+
+    clusterEntityConfigDictList  = []
+    systemTestResultsList        = []
+
+    def __init__(self):
+        "Create an object with this system test session environment"
+
+        # retrieve each entity's data from cluster config json file
+        # as "dict" and enter them into a "list"
+        jsonFileContent = open(self.CLUSTER_CONFIG_PATHNAME, "r").read()
+        jsonData        = json.loads(jsonFileContent)
+        for key, cfgList in jsonData.items():
+            if key == "cluster_config":
+                for cfg in cfgList:
+                    self.clusterEntityConfigDictList.append(cfg)
+
+
+    def getSystemTestEnvDict(self):
+        envDict = {}
+        envDict["system_test_base_dir"]             = self.SYSTEM_TEST_BASE_DIR
+        envDict["system_test_util_dir"]             = self.SYSTEM_TEST_UTIL_DIR
+        envDict["cluster_config_pathname"]          = self.CLUSTER_CONFIG_PATHNAME
+        envDict["system_test_suite_suffix"]         = self.SYSTEM_TEST_SUITE_SUFFIX
+        envDict["system_test_case_prefix"]          = self.SYSTEM_TEST_CASE_PREFIX
+        envDict["system_test_module_ext"]           = self.SYSTEM_TEST_MODULE_EXT
+        envDict["cluster_config_pathname"]          = self.CLUSTER_CONFIG_PATHNAME
+        envDict["cluster_entity_config_dict_list"]  = self.clusterEntityConfigDictList
+        envDict["system_test_results_list"]         = self.systemTestResultsList
+        return envDict
+
+

Added: incubator/kafka/branches/0.8/system_test/system_test_runner.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/system_test_runner.py?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/system_test_runner.py (added)
+++ incubator/kafka/branches/0.8/system_test/system_test_runner.py Wed Aug 22 17:16:26 2012
@@ -0,0 +1,164 @@
+#!/usr/bin/evn python
+
+# ===================================
+# system_test_runner.py
+# ===================================
+
+from system_test_env import SystemTestEnv
+from utils import system_test_utils
+
+import logging
+import os
+import sys
+
+
+# ====================================================================
+# Two logging formats are defined in system_test/system_test_runner.py
+# ====================================================================
+
+# 1. "namedLogger" is defined to log message in this format:
+#    "%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s"
+# 
+# usage: to log message and showing the class name of the message
+
+namedLogger = logging.getLogger("namedLogger")
+namedLogger.setLevel(logging.INFO)
+#namedLogger.setLevel(logging.DEBUG)
+namedConsoleHandler = logging.StreamHandler()
+namedConsoleHandler.setLevel(logging.DEBUG)
+namedFormatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s")
+namedConsoleHandler.setFormatter(namedFormatter)
+namedLogger.addHandler(namedConsoleHandler)
+
+# 2. "anonymousLogger" is defined to log message in this format:
+#    "%(asctime)s - %(levelname)s - %(message)s"
+# 
+# usage: to log message without showing class name and it's appropriate
+#        for logging generic message such as "sleeping for 5 seconds"
+
+anonymousLogger = logging.getLogger("anonymousLogger")
+anonymousLogger.setLevel(logging.INFO)
+#anonymousLogger.setLevel(logging.DEBUG)
+anonymousConsoleHandler = logging.StreamHandler()
+anonymousConsoleHandler.setLevel(logging.DEBUG)
+anonymousFormatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
+anonymousConsoleHandler.setFormatter(anonymousFormatter)
+anonymousLogger.addHandler(anonymousConsoleHandler)
+
+d = {'name_of_class': '(system_test_runner)'}
+
+def main():
+
+    print
+    print
+    print
+    anonymousLogger.info("=================================================")
+    anonymousLogger.info("        System Regression Test Framework")
+    anonymousLogger.info("=================================================")
+    print
+    print
+
+    testSuiteClassDictList = []
+
+    # SystemTestEnv is a class to provide all environement settings for this session
+    # such as the SYSTEM_TEST_BASE_DIR, SYSTEM_TEST_UTIL_DIR, ...
+    systemTestEnv = SystemTestEnv()
+
+    # sanity check on remote hosts to make sure:
+    # - all directories (eg. java_home) specified in cluster_config.json exists in all hosts
+    # - no conflicting running processes in remote hosts
+    anonymousLogger.info("=================================================")
+    anonymousLogger.info("setting up remote hosts ...")
+    anonymousLogger.info("=================================================")
+    if not system_test_utils.setup_remote_hosts(systemTestEnv):
+        namedLogger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
+        print
+        sys.exit(1)
+    print
+
+    # get all defined names within a module: 
+    definedItemList = dir(SystemTestEnv)
+    anonymousLogger.debug("=================================================")
+    anonymousLogger.debug("SystemTestEnv keys:")
+    for item in definedItemList:
+        anonymousLogger.debug("    " + item)
+    anonymousLogger.debug("=================================================")
+
+    anonymousLogger.info("=================================================")
+    anonymousLogger.info("looking up test suites ...")
+    anonymousLogger.info("=================================================")
+    # find all test suites in SYSTEM_TEST_BASE_DIR
+    for dirName in os.listdir(systemTestEnv.SYSTEM_TEST_BASE_DIR):
+
+        # make sure this is a valid testsuite directory
+        if os.path.isdir(dirName) and dirName.endswith(systemTestEnv.SYSTEM_TEST_SUITE_SUFFIX):
+            
+            namedLogger.info("found a testsuite : " + dirName, extra=d)
+            testModulePathName = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + dirName)
+
+            # go through all test modules file in this testsuite
+            for moduleFileName in os.listdir(testModulePathName):
+
+                # make sure it is a valid test module
+                if moduleFileName.endswith(systemTestEnv.SYSTEM_TEST_MODULE_EXT) \
+                   and not moduleFileName.startswith("__"):
+
+                    # found a test module file
+                    namedLogger.info("found a test module file : " + moduleFileName, extra=d) 
+
+                    testModuleClassName = system_test_utils.sys_call("grep ^class " + testModulePathName + "/" + \
+                                          moduleFileName + " | sed 's/^class //g' | sed 's/(.*):.*//g'")
+                    testModuleClassName = testModuleClassName.rstrip('\n')
+
+                    # collect the test suite class data
+                    testSuiteClassDict           = {}
+                    testSuiteClassDict["suite"]  = dirName
+                    extLenToRemove               = systemTestEnv.SYSTEM_TEST_MODULE_EXT.__len__() * -1 
+                    testSuiteClassDict["module"] = moduleFileName[:extLenToRemove]
+                    testSuiteClassDict["class"]  = testModuleClassName
+                    testSuiteClassDictList.append(testSuiteClassDict)
+
+    # loop through testSuiteClassDictList and start the test class one by one
+    for testSuiteClassDict in testSuiteClassDictList:
+
+        suiteName  = testSuiteClassDict["suite"]
+        moduleName = testSuiteClassDict["module"]
+        className  = testSuiteClassDict["class"]
+
+        # add testsuite directory to sys.path such that the module can be loaded
+        sys.path.append(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + suiteName)
+
+        anonymousLogger.info("=================================================")
+        anonymousLogger.info("Running Test for : ")
+        anonymousLogger.info("    suite  : " + suiteName)
+        anonymousLogger.info("    module : " + moduleName)
+        anonymousLogger.info("    class  : " + className)
+        anonymousLogger.info("=================================================")
+
+        # dynamically loading a module and starting the test class
+        mod      = __import__(moduleName)
+        theClass = getattr(mod, className)
+        instance = theClass(systemTestEnv)
+        instance.runTest()
+
+    print
+    anonymousLogger.info("=================================================")
+    anonymousLogger.info("                 TEST REPORTS")
+    anonymousLogger.info("=================================================")
+    for systemTestResult in systemTestEnv.systemTestResultsList:
+        for key,val in systemTestResult.items():
+            if key == "validation_status":
+                anonymousLogger.info(key + " : ")
+                for validation, status in val.items():
+                     anonymousLogger.info("    " + validation + " : " + status)
+            else:
+                anonymousLogger.info(key + " : " + val)
+        print
+
+# =========================
+# main entry point
+# =========================
+
+main()
+
+

Added: incubator/kafka/branches/0.8/system_test/utils/__init__.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/__init__.py?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/__init__.py (added)
+++ incubator/kafka/branches/0.8/system_test/utils/__init__.py Wed Aug 22 17:16:26 2012
@@ -0,0 +1 @@
+ 

Added: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1376147&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (added)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Wed Aug 22 17:16:26 2012
@@ -0,0 +1,681 @@
+#!/usr/bin/env python
+
+# ===================================
+# kafka_system_test_utils.py
+# ===================================
+
+import datetime
+import inspect
+import json
+import logging
+import os
+import re
+import subprocess
+import sys
+import time
+import traceback
+
+import system_test_utils
+
+from datetime  import datetime
+from time      import mktime
+
+# ====================================================================
+# Two logging formats are defined in system_test/system_test_runner.py
+# ====================================================================
+
+# 1. "namedLogger" is defined to log message in this format:
+#    "%(asctime)s - %(levelname)s - %(message)s %(name_of_class)s"
+#    usage: to log message and showing the class name of the message
+
+logger     = logging.getLogger("namedLogger")
+thisClassName = '(kafka_system_test_utils)'
+d = {'name_of_class': thisClassName}
+
+# 2. "anonymousLogger" is defined to log message in this format:
+#    "%(asctime)s - %(levelname)s - %(message)s"
+#    usage: to log message without showing class name and it's appropriate
+#           for logging generic message such as "sleeping for 5 seconds"
+
+anonLogger = logging.getLogger("anonymousLogger")
+
+
+# =====================================
+# Sample usage of getting testcase env
+# =====================================
+def get_testcase_env(testcaseEnv):
+    anonLogger.info("================================================")
+    anonLogger.info("systemTestBaseDir     : " + testcaseEnv.systemTestBaseDir)
+    anonLogger.info("testSuiteBaseDir      : " + testcaseEnv.testSuiteBaseDir)
+    anonLogger.info("testCaseBaseDir       : " + testcaseEnv.testCaseBaseDir)
+    anonLogger.info("testCaseLogsDir       : " + testcaseEnv.testCaseLogsDir)
+    anonLogger.info("userDefinedEnvVarDict : (testcaseEnv.userDefinedEnvVarDict)")
+    anonLogger.info("================================================")
+
+
+def get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, type):
+
+    defaultLogDir = testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId
+
+    # type is either "metrics" or "dashboards" or "default"
+    if type == "metrics":
+        return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/metrics"
+    elif type == "default" :
+        return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId
+    elif type == "dashboards":
+        return testcaseEnv.testCaseLogsDir + "/dashboards"
+    elif type == "config":
+        return testcaseEnv.testCaseBaseDir + "/config"
+    else:
+        logger.error("unrecognized log directory type : " + type, extra=d)
+        logger.error("returning default log dir : " + defaultLogDir, extra=d)
+        return defaultLogDir
+
+
+def generate_testcase_log_dirs(systemTestEnv, testcaseEnv):
+
+    testcasePathName = testcaseEnv.testCaseBaseDir
+    logger.debug("testcase pathname: " + testcasePathName, extra=d)
+
+    if not os.path.exists(testcasePathName + "/config") : os.makedirs(testcasePathName + "/config")
+    if not os.path.exists(testcasePathName + "/logs")   : os.makedirs(testcasePathName + "/logs")
+
+    dashboardsPathName = testcaseEnv.testCaseLogsDir + "/dashboards"
+    if not os.path.exists(dashboardsPathName) : os.makedirs(dashboardsPathName)
+
+    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
+        entityId = clusterEntityConfigDict["entity_id"]
+        role     = clusterEntityConfigDict["role"]
+
+        logger.debug("entity_id : " + entityId, extra=d)
+        logger.debug("role      : " + role,     extra=d)
+
+        metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
+        if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName)
+
+
+def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
+    anonLogger.info("================================================")
+    anonLogger.info("collecting logs from remote machines")
+    anonLogger.info("================================================")
+
+    testCaseBaseDir = testcaseEnv.testCaseBaseDir
+
+    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
+        hostname   = clusterEntityConfigDict["hostname"]
+        entity_id  = clusterEntityConfigDict["entity_id"]
+        role       = clusterEntityConfigDict["role"]
+
+        logger.debug("entity_id : " + entity_id, extra=d)
+        logger.debug("hostname  : " + hostname,  extra=d)
+        logger.debug("role      : " + role,      extra=d)
+
+        configPathName     = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
+        metricsPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
+        dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
+
+        cmdList = ["scp",
+                   hostname + ":" + metricsPathName + "/*",
+                   metricsPathName]
+        cmdStr  = " ".join(cmdList)
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr)
+ 
+
+def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv):
+    testCaseBaseDir = testcaseEnv.testCaseBaseDir
+
+    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
+        hostname   = clusterEntityConfigDict["hostname"]
+        entity_id  = clusterEntityConfigDict["entity_id"]
+        role       = clusterEntityConfigDict["role"]
+
+        logger.debug("entity_id : " + entity_id, extra=d)
+        logger.debug("hostname  : " + hostname, extra=d)
+        logger.debug("role      : " + role, extra=d)
+
+        configPathName     = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
+        metricsPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
+        dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
+
+        cmdList = ["ssh " + hostname,
+                   "'mkdir -p",
+                   configPathName,
+                   metricsPathName,
+                   dashboardsPathName + "'"]
+        cmdStr  = " ".join(cmdList)
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr)
+
+
+def init_entity_props(systemTestEnv, testcaseEnv):
+    clusterConfigsList  = systemTestEnv.clusterEntityConfigDictList
+    testcaseConfigsList = testcaseEnv.testcaseConfigsList
+    testcasePathName    = testcaseEnv.testCaseBaseDir
+
+    # consumer config / log files location
+    consEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
+                             clusterConfigsList, "role", "console_consumer", "entity_id")
+    consLogList        = system_test_utils.get_data_from_list_of_dicts( \
+                             testcaseConfigsList, "entity_id", consEntityIdList[0], "log_filename")
+    consLogPathname    = testcasePathName + "/logs/" + consLogList[0]
+    consCfgList        = system_test_utils.get_data_from_list_of_dicts( \
+                             testcaseConfigsList, "entity_id", consEntityIdList[0], "config_filename")
+    consCfgPathname    = testcasePathName + "/config/" + consCfgList[0]
+
+    # producer config / log files location
+    prodEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
+                             clusterConfigsList, "role", "producer_performance", "entity_id")
+    prodLogList        = system_test_utils.get_data_from_list_of_dicts( \
+                             testcaseConfigsList, "entity_id", prodEntityIdList[0], "log_filename")
+    prodLogPathname    = testcasePathName + "/logs/" + prodLogList[0]
+    prodCfgList        = system_test_utils.get_data_from_list_of_dicts( \
+                             testcaseConfigsList, "entity_id", prodEntityIdList[0], "config_filename")
+    prodCfgPathname    = testcasePathName + "/config/" + prodCfgList[0]
+
+    testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"]    = consLogPathname
+    testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"] = consCfgPathname
+    testcaseEnv.userDefinedEnvVarDict["producerLogPathName"]    = prodLogPathname
+    testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"] = prodCfgPathname
+
+
+def copy_file_with_dict_values(srcFile, destFile, dictObj):
+    infile  = open(srcFile, "r")
+    inlines = infile.readlines()
+    infile.close()
+
+    outfile = open(destFile, 'w')
+    for line in inlines:
+        for key in dictObj.keys():
+            if (line.startswith(key + "=")):
+                line = key + "=" + dictObj[key] + "\n"
+        outfile.write(line)
+    outfile.close()
+
+
+def start_metrics_collection(jmxHost, jmxPort, mBeanObjectName, mBeanAttributes, entityId, clusterEntityConfigDictList, testcaseEnv):
+    logger.info("starting metrics collection on jmx port: " + jmxPort, extra=d)
+    jmxUrl    = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
+    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
+    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+    metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", entityId, "metrics")
+
+    startMetricsCmdList = ["ssh " + jmxHost,
+                           "'JAVA_HOME=" + javaHome,
+                           "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
+                           "--jmx-url " + jmxUrl,
+                           "--object-name " + mBeanObjectName + " &> ",
+                           metricsPathName + "/metrics.csv & echo pid:$! > ",
+                           metricsPathName + "/entity_pid'"]
+   
+    startMetricsCommand = " ".join(startMetricsCmdList) 
+    logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
+    system_test_utils.async_sys_call(startMetricsCommand)
+
+    pidCmdStr = "ssh " + jmxHost + " 'cat " + metricsPathName + "/entity_pid'"
+    logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+    subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+    # keep track of the remote entity pid in a dictionary
+    for line in subproc.stdout.readlines():
+        if line.startswith("pid"):
+            line = line.rstrip('\n')
+            logger.debug("found pid line: [" + line + "]", extra=d)
+            tokens  = line.split(':')
+            thisPid = tokens[1]
+            testcaseEnv.entityParentPidDict[thisPid] = thisPid
+
+
+def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
+    logger.info("calling generate_properties_files", extra=d)
+
+    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
+    tcPathname    = testcaseEnv.testCaseBaseDir
+    tcConfigsList = testcaseEnv.testcaseConfigsList
+
+    cfgTemplatePathname = os.path.abspath(testsuitePathname + "/config")
+    cfgDestPathname     = os.path.abspath(tcPathname + "/config")
+    logger.info("config template (source) pathname : " + cfgTemplatePathname, extra=d)
+    logger.info("testcase config (dest)   pathname : " + cfgDestPathname, extra=d)
+
+    # loop through all zookeepers (if more than 1) to retrieve host and clientPort
+    # to construct a zk.connect str for broker in the form of:
+    # zk.connect=<host1>:<port2>,<host2>:<port2>
+    zkConnectStr = ""
+    zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")
+    for zkDict in zkDictList:
+        entityID       = zkDict["entity_id"]
+        hostname       = zkDict["hostname"]
+        clientPortList = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "clientPort")
+        clientPort     = clientPortList[0]
+
+        if ( zkConnectStr.__len__() == 0 ):
+            zkConnectStr = hostname + ":" + clientPort
+        else:
+            zkConnectStr = zkConnectStr + "," + hostname + ":" + clientPort
+
+    # for each entity in the cluster config
+    for clusterCfg in clusterConfigsList:
+        cl_entity_id = clusterCfg["entity_id"]
+
+        for tcCfg in tcConfigsList:
+            if (tcCfg["entity_id"] == cl_entity_id):
+
+                # copy the associated .properties template, update values, write to testcase_<xxx>/config
+
+                if ( clusterCfg["role"] == "broker" ):
+                    tcCfg["zk.connect"] = zkConnectStr
+                    copy_file_with_dict_values(cfgTemplatePathname + "/server.properties", \
+                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
+
+                elif ( clusterCfg["role"] == "zookeeper"):
+                    copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties", \
+                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
+
+                elif ( clusterCfg["role"] == "producer_performance"):
+                    tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
+                    copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \
+                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
+
+                elif ( clusterCfg["role"] == "console_consumer"):
+                    tcCfg["zookeeper"] = zkConnectStr
+                    copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties", \
+                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
+                else:
+                    print "    => ", tcCfg
+                    print "UNHANDLED key"
+
+    # scp updated config files to remote hosts
+    scp_file_to_remote_host(clusterConfigsList, testcaseEnv)
+
+
+def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv):
+
+    testcaseConfigsList = testcaseEnv.testcaseConfigsList
+
+    for clusterEntityConfigDict in clusterEntityConfigDictList:
+        hostname         = clusterEntityConfigDict["hostname"]
+        testcasePathName = testcaseEnv.testCaseBaseDir
+
+        cmdStr = "scp " + testcasePathName + "/config/* " + hostname + ":" + testcasePathName + "/config"
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr)
+
+
+def start_zookeepers(systemTestEnv, testcaseEnv):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
+                         clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
+
+    for zkEntityId in zkEntityIdList:
+        start_entity_in_background(systemTestEnv, testcaseEnv, zkEntityId)
+
+
+def start_brokers(systemTestEnv, testcaseEnv):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
+                             clusterEntityConfigDictList, "role", "broker", "entity_id")
+
+    for brokerEntityId in brokerEntityIdList:
+        start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
+
+
+def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
+
+    logger.info("looking up leader...", extra=d)
+
+    # keep track of leader related data in this dict such as broker id,
+    # entity id and timestamp and return it to the caller function
+    leaderDict = {} 
+
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
+                             clusterEntityConfigDictList, "role", "broker", "entity_id")
+
+    for brokerEntityId in brokerEntityIdList:
+
+        hostname   = system_test_utils.get_data_by_lookup_keyval( \
+                         clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
+        logFile    = system_test_utils.get_data_by_lookup_keyval( \
+                         testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
+
+        leaderDict["entity_id"] = brokerEntityId
+        leaderDict["hostname"]  = hostname
+
+        logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
+        cmdStrList = ["ssh " + hostname,
+                      "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
+                      logPathName + "/" + logFile + " | ",
+                      "sort | tail -1\""]
+        cmdStr     = " ".join(cmdStrList)
+
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+        for line in subproc.stdout.readlines():
+
+            line = line.rstrip('\n')
+
+            if testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] in line:
+                logger.info("found the log line : " + line, extra=d)
+                try:
+                    matchObj    = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"], line)
+                    datetimeStr = matchObj.group(1)
+                    datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
+                    unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
+                    #print "{0:.3f}".format(unixTs)
+                    leaderDict["timestamp"] = unixTs
+                    leaderDict["brokerid"]  = matchObj.group(2)
+                    leaderDict["topic"]     = matchObj.group(3)
+                    leaderDict["partition"] = matchObj.group(4)
+                except:
+                    logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
+            #else:
+            #    logger.debug("unmatched line found [" + line + "]", extra=d)
+
+    return leaderDict
+
+
+def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
+
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    # cluster configurations:
+    hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
+    role      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "role")
+    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
+    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+    jmxPort   = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
+
+    # testcase configurations:
+    testcaseConfigsList = testcaseEnv.testcaseConfigsList
+    clientPort = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "clientPort")
+    configFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "config_filename")
+    logFile    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
+
+    logger.info("starting " + role + " in host [" + hostname + "] on client port [" + clientPort + "]", extra=d)
+
+    configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config")
+    logPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default")
+
+    if role == "zookeeper":
+        cmdList = ["ssh " + hostname,
+                  "'JAVA_HOME=" + javaHome,
+                  kafkaHome + "/bin/zookeeper-server-start.sh ",
+                  configPathName + "/" + configFile + " &> ",
+                  logPathName + "/" + logFile + " & echo pid:$! > ",
+                  logPathName + "/entity_" + entityId + "_pid'"]
+
+
+        # construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr
+        if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ):
+            testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \
+                testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] + "," + hostname + ":" + clientPort
+        else:
+            testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = hostname + ":" + clientPort
+
+    elif role == "broker":
+        cmdList = ["ssh " + hostname,
+                  "'JAVA_HOME=" + javaHome,
+                  "JMX_PORT=" + jmxPort,
+                  kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
+                  configPathName + "/" + configFile + " &> ",
+                  logPathName + "/" + logFile + " & echo pid:$! > ",
+                  logPathName + "/entity_" + entityId + "_pid'"]
+
+    # it seems to be a better idea to launch producer & consumer in separate functions
+    # elif role == "producer_performance":
+    # elif role == "console_consumer":
+
+    cmdStr = " ".join(cmdList)
+
+    logger.debug("executing command: [" + cmdStr + "]", extra=d)
+    system_test_utils.async_sys_call(cmdStr)
+    time.sleep(5)
+
+    pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid'"
+    logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+    subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+    # keep track of the remote entity pid in a dictionary
+    for line in subproc.stdout.readlines():
+        if line.startswith("pid"):
+            line = line.rstrip('\n')
+            logger.debug("found pid line: [" + line + "]", extra=d)
+            tokens = line.split(':')
+            testcaseEnv.entityParentPidDict[entityId] = tokens[1]
+
+    # if it is a broker, start metric collection
+    if role == "broker":
+        start_metrics_collection(hostname, jmxPort, "kafka:type=kafka.SocketServerStats", \
+            "AvgFetchRequestMs, AvgProduceRequestMs", entityId, clusterEntityConfigDictList, testcaseEnv)
+
+
+def start_console_consumer(systemTestEnv, testcaseEnv):
+
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
+    for consumerConfig in consumerConfigList:
+        host              = consumerConfig["hostname"]
+        entityId          = consumerConfig["entity_id"]
+        kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
+                                clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
+        javaHome          = system_test_utils.get_data_by_lookup_keyval( \
+                                clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+        kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
+
+        logger.info("starting console consumer", extra=d)
+
+        consumerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + \
+                                  "/console_consumer.log"
+
+        testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName
+
+        commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"])
+        cmdList = ["ssh " + host,
+                   "'JAVA_HOME=" + javaHome,
+                   kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
+                   commandArgs + " &> " + consumerLogPathName + "'"]
+
+        cmdStr = " ".join(cmdList)
+        logger.debug("executing command: [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr)
+
+
+
+def start_producer_performance(systemTestEnv, testcaseEnv):
+
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \
+                             clusterEntityConfigDictList, "role", "producer_performance")
+    for producerConfig in producerConfigList:
+        host              = producerConfig["hostname"]
+        entityId          = producerConfig["entity_id"]
+        kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
+                                clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
+        javaHome          = system_test_utils.get_data_by_lookup_keyval( \
+                                clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+        kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
+
+        logger.info("starting producer preformance", extra=d)
+
+        producerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + \
+                                  "/producer_performance.log"
+
+        testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
+
+        commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
+        cmdList = ["ssh " + host,
+                   "'JAVA_HOME=" + javaHome,
+                   kafkaRunClassBin + " kafka.perf.ProducerPerformance",
+                   commandArgs + " &> " + producerLogPathName + "'"]
+
+        cmdStr = " ".join(cmdList)
+        logger.debug("executing command: [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr) 
+
+
+def stop_remote_entity(systemTestEnv, entityId, parentPid):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
+    pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
+
+    logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
+    system_test_utils.sigterm_remote_process(hostname, pidStack)
+    time.sleep(1)
+    system_test_utils.sigkill_remote_process(hostname, pidStack)
+
+
+def create_topic(systemTestEnv, testcaseEnv):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts( \
+                          clusterEntityConfigDictList, "role", "producer_performance")
+    prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts( \
+                          testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
+    prodTopicList   = prodPerfCfgDict[0]["topic"].split(',')
+
+    zkEntityId      = system_test_utils.get_data_by_lookup_keyval( \
+                          clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
+    zkHost          = system_test_utils.get_data_by_lookup_keyval( \
+                          clusterEntityConfigDictList, "role", "zookeeper", "hostname")
+    kafkaHome       = system_test_utils.get_data_by_lookup_keyval( \
+                          clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
+    javaHome        = system_test_utils.get_data_by_lookup_keyval( \
+                          clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
+    createTopicBin  = kafkaHome + "/bin/kafka-create-topic.sh"
+
+    logger.info("zkEntityId : " + zkEntityId, extra=d)
+    logger.info("createTopicBin : " + createTopicBin, extra=d)
+
+    for topic in prodTopicList:
+        logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] + "]", extra=d) 
+        cmdList = ["ssh " + zkHost,
+                   "'JAVA_HOME=" + javaHome,
+                   createTopicBin,
+                   " --topic "     + topic,
+                   " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["zkConnectStr"],
+                   " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
+                   " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " &> ",
+                   testcaseEnv.testCaseBaseDir + "/logs/create_topic.log'"]
+
+        cmdStr = " ".join(cmdList)
+        logger.debug("executing command: [" + cmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+
+
+def get_message_id(logPathName):
+    logLines      = open(logPathName, "r").readlines()
+    messageIdList = []
+
+    for line in logLines:
+        if not "MessageID" in line:
+            continue
+        else:
+            matchObj = re.match('.*MessageID:(.*?):', line)
+            messageIdList.append( matchObj.group(1) )
+
+    return messageIdList
+
+
+def validate_data_matched(systemTestEnv, testcaseEnv):
+    validationStatusDict        = testcaseEnv.validationStatusDict
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    producerEntityId = system_test_utils.get_data_by_lookup_keyval( \
+                           clusterEntityConfigDictList, "role", "producer_performance", "entity_id")
+    consumerEntityId = system_test_utils.get_data_by_lookup_keyval( \
+                           clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
+    msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \
+                           testcaseEnv, "console_consumer", consumerEntityId, "default") + \
+                           "/msg_id_missing_in_consumer.log"
+
+    producerMsgIdList  = get_message_id(testcaseEnv.userDefinedEnvVarDict["producerLogPathName"])
+    consumerMsgIdList  = get_message_id(testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"])
+    producerMsgIdSet   = set(producerMsgIdList)
+    consumerMsgIdSet   = set(consumerMsgIdList)
+
+    missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+
+    outfile = open(msgIdMissingInConsumerLogPathName, "w")
+    for id in missingMsgIdInConsumer:
+        outfile.write(id + "\n")
+    outfile.close()
+
+    logger.info("no. of unique messages sent from publisher  : " + str(len(producerMsgIdSet)), extra=d)
+    logger.info("no. of unique messages received by consumer : " + str(len(producerMsgIdSet)), extra=d)
+
+    if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
+        validationStatusDict["Validate for data matched"] = "PASSED"
+        return True
+    else:
+        validationStatusDict["Validate for data matched"] = "FAILED"
+        logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
+        return False
+     
+
+def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict):
+
+    if ( len(leaderDict) > 0 ):
+        try:
+            leaderBrokerId = leaderDict["brokerid"]
+            leaderEntityId = leaderDict["entity_id"]
+            leaderPid      = testcaseEnv.entityParentPidDict[leaderEntityId]
+            hostname       = leaderDict["hostname"]
+
+            logger.info("found leader in entity [" + leaderEntityId + "] with brokerid [" + \
+                leaderBrokerId + "] for partition [" + leaderDict["partition"] + "]", extra=d)
+            validationStatusDict["Validate leader election successful"] = "PASSED"
+            return True
+        except Exception, e:
+            logger.error("leader info not completed:", extra=d)
+            validationStatusDict["Validate leader election successful"] = "FAILED"
+            print leaderDict
+            print e
+            return False
+    else:
+        validationStatusDict["Validate leader election successful"] = "FAILED"
+        return False
+
+
+def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv):
+
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    testcaseConfigsList         = testcaseEnv.testcaseConfigsList
+
+    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
+
+        hostname         = clusterEntityConfigDict["hostname"]
+        entityId         = clusterEntityConfigDict["entity_id"]
+        role             = clusterEntityConfigDict["role"]
+        #testcasePathName = testcaseEnv.testcaseBaseDir
+        cmdStr           = ""
+        dataDir          = ""
+
+        logger.info("cleaning up data dir on host: [" + hostname + "]", extra=d)
+
+        if role == 'zookeeper':
+            dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "dataDir")
+        elif role == 'broker':
+            dataDir = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log.dir")
+        else:
+            logger.info("skipping role [" + role + "] on host : [" + hostname + "]", extra=d)
+            continue
+
+        cmdStr  = "ssh " + hostname + " 'rm -r " + dataDir + "/*'"
+
+        if not dataDir.startswith("/tmp"):
+            logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
+            logger.warn("check config file: system_test/cluster_config.properties", extra=d)
+            logger.warn("aborting test...", extra=d)
+            sys.exit(1)
+
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr)
+
+    
+



Mime
View raw message