kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [01/13] kafka git commit: KAFKA-2715: Removed previous system_test folder
Date Fri, 30 Oct 2015 22:13:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c001b2040 -> d50499a0e


http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/metrics.py
----------------------------------------------------------------------
diff --git a/system_test/utils/metrics.py b/system_test/utils/metrics.py
deleted file mode 100644
index 3e66348..0000000
--- a/system_test/utils/metrics.py
+++ /dev/null
@@ -1,298 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#!/usr/bin/env python
-
-# ===================================
-# file: metrics.py
-# ===================================
-
-import inspect
-import json
-import logging
-import os
-import signal
-import subprocess
-import sys
-import traceback
-
-import csv
-import time 
-import matplotlib as mpl
-mpl.use('Agg')
-import matplotlib.pyplot as plt
-from collections import namedtuple
-import numpy
-
-from pyh import *
-import kafka_system_test_utils
-import system_test_utils
-
-logger     = logging.getLogger("namedLogger")
-thisClassName = '(metrics)'
-d = {'name_of_class': thisClassName}
-
-attributeNameToNameInReportedFileMap = {
-    'Min': 'min',
-    'Max': 'max',
-    'Mean': 'mean',
-    '50thPercentile': 'median',
-    'StdDev': 'stddev',
-    '95thPercentile': '95%',
-    '99thPercentile': '99%',
-    '999thPercentile': '99.9%',
-    'Count': 'count',
-    'OneMinuteRate': '1 min rate',
-    'MeanRate': 'mean rate',
-    'FiveMinuteRate': '5 min rate',
-    'FifteenMinuteRate': '15 min rate',
-    'Value': 'value'
-}
-
-def getCSVFileNameFromMetricsMbeanName(mbeanName):
-    return mbeanName.replace(":type=", ".").replace(",name=", ".") + ".csv"
-
-def read_metrics_definition(metricsFile):
-    metricsFileData = open(metricsFile, "r").read()
-    metricsJsonData = json.loads(metricsFileData)
-    allDashboards = metricsJsonData['dashboards']
-    allGraphs = []
-    for dashboard in allDashboards:
-        dashboardName = dashboard['name']
-        graphs = dashboard['graphs']
-        for graph in graphs:
-            bean = graph['bean_name']
-            allGraphs.append(graph)
-            attributes = graph['attributes']
-            #print "Filtering on attributes " + attributes     
-    return allGraphs
-            
-def get_dashboard_definition(metricsFile, role):
-    metricsFileData = open(metricsFile, "r").read()
-    metricsJsonData = json.loads(metricsFileData)
-    allDashboards = metricsJsonData['dashboards']
-    dashboardsForRole = []
-    for dashboard in allDashboards:
-        if dashboard['role'] == role:
-            dashboardsForRole.append(dashboard) 
-    return dashboardsForRole
-
-def ensure_valid_headers(headers, attributes):
-    if headers[0] != "# time":
-        raise Exception("First column should be time")
-    for header in headers:
-        logger.debug(header, extra=d)
-    # there should be exactly one column with a name that matches attributes
-    try:
-        attributeColumnIndex = headers.index(attributes)
-        return attributeColumnIndex
-    except ValueError as ve:
-        #print "#### attributes : ", attributes
-        #print "#### headers    : ", headers
-        raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +  
-                        " headers: {0}".format(",".join(headers)))
-        
-def plot_graphs(inputCsvFiles, labels, title, xLabel, yLabel, attribute, outputGraphFile):
-    if not inputCsvFiles: return
-
-    # create empty plot
-    fig=plt.figure()
-    fig.subplots_adjust(bottom=0.2)
-    ax=fig.add_subplot(111)
-    labelx = -0.3  # axes coords
-    ax.set_xlabel(xLabel)
-    ax.set_ylabel(yLabel)
-    ax.grid()
-    #ax.yaxis.set_label_coords(labelx, 0.5)
-    Coordinates = namedtuple("Coordinates", 'x y')
-    plots = []
-    coordinates = []
-    # read data for all files, organize by label in a dict
-    for fileAndLabel in zip(inputCsvFiles, labels):
-        inputCsvFile = fileAndLabel[0]
-        label = fileAndLabel[1]
-        csv_reader = list(csv.reader(open(inputCsvFile, "rb")))
-        x,y = [],[]
-        xticks_labels = []
-        try:
-            # read first line as the headers
-            headers = csv_reader.pop(0)
-            attributeColumnIndex = ensure_valid_headers(headers, attributeNameToNameInReportedFileMap[attribute])
-            logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d)
-            start_time = (int)(os.path.getctime(inputCsvFile) * 1000)
-            int(csv_reader[0][0])
-            for line in csv_reader:
-                if(len(line) == 0):
-                    continue
-                yVal = float(line[attributeColumnIndex])                
-                xVal = int(line[0])
-                y.append(yVal)
-                epoch= start_time + int(line[0])
-                x.append(xVal)
-                xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch)))
-                coordinates.append(Coordinates(xVal, yVal))
-            p1 = ax.plot(x,y)
-            plots.append(p1)
-        except Exception as e:
-            logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d)
-            traceback.print_exc()
-    # find xmin, xmax, ymin, ymax from all csv files
-    xmin = min(map(lambda coord: coord.x, coordinates))
-    xmax = max(map(lambda coord: coord.x, coordinates))
-    ymin = min(map(lambda coord: coord.y, coordinates))
-    ymax = max(map(lambda coord: coord.y, coordinates))
-    # set x and y axes limits
-    plt.xlim(xmin, xmax)
-    plt.ylim(ymin, ymax)
-    # set ticks accordingly
-    xticks = numpy.arange(xmin, xmax, 0.2*xmax)
-#    yticks = numpy.arange(ymin, ymax)
-    plt.xticks(xticks,xticks_labels,rotation=17)
-#    plt.yticks(yticks)
-    plt.legend(plots,labels, loc=2)
-    plt.title(title)
-    plt.savefig(outputGraphFile)
-
-def draw_all_graphs(metricsDescriptionFile, testcaseEnv, clusterConfig):
-    # go through each role and plot graphs for the role's metrics
-    roles = set(map(lambda config: config['role'], clusterConfig))
-    for role in roles:
-        dashboards = get_dashboard_definition(metricsDescriptionFile, role)
-        entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
-        for dashboard in dashboards:
-            graphs = dashboard['graphs']
-            # draw each graph for all entities
-            draw_graph_for_role(graphs, entities, role, testcaseEnv)
-        
-def draw_graph_for_role(graphs, entities, role, testcaseEnv):
-    for graph in graphs:
-        graphName = graph['graph_name'] 
-        yLabel = graph['y_label']
-        inputCsvFiles = []
-        graphLegendLabels = []
-        for entity in entities:
-            entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics")
-            entityMetricCsvFile = entityMetricsDir + "/" + getCSVFileNameFromMetricsMbeanName(graph['bean_name'])
-            if(not os.path.exists(entityMetricCsvFile)):
-                logger.warn("The file {0} does not exist for plotting".format(entityMetricCsvFile), extra=d)
-            else:
-                inputCsvFiles.append(entityMetricCsvFile)
-                graphLegendLabels.append(role + "-" + entity['entity_id'])
-#            print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
-        try:
-            # plot one graph per mbean attribute
-            labels = graph['y_label'].split(',')
-            fullyQualifiedAttributeNames = map(lambda attribute: graph['bean_name'] + ':' + attribute, 
-                                           graph['attributes'].split(','))
-            attributes = graph['attributes'].split(',')
-            for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes):            
-                outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg"            
-                plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2], 
-                            "time", labelAndAttribute[0], labelAndAttribute[2], outputGraphFile)
-#            print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
-        except Exception as e:
-            logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
-            traceback.print_exc()
-
-def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
-    metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
-    centralDashboard = PyH('Kafka Metrics Dashboard')
-    centralDashboard << h1('Kafka Metrics Dashboard', cl='center')
-    roles = set(map(lambda config: config['role'], clusterConfig))
-    for role in roles:
-        entities = kafka_system_test_utils.get_entities_for_role(clusterConfig, role)
-        dashboardPagePath = build_dashboard_for_role(metricsDefinitionFile, role, 
-                                                     entities, testcaseDashboardsDir)
-        centralDashboard << a(role, href = dashboardPagePath)
-        centralDashboard << br()
-            
-    centralDashboard.printOut(metricsHtmlFile)
-
-def build_dashboard_for_role(metricsDefinitionFile, role, entities, testcaseDashboardsDir):
-    # build all dashboards for the input entity's based on its role. It can be one of kafka, zookeeper, producer
-    # consumer
-    dashboards = get_dashboard_definition(metricsDefinitionFile, role)
-    entityDashboard = PyH('Kafka Metrics Dashboard for ' + role)
-    entityDashboard << h1('Kafka Metrics Dashboard for ' + role, cl='center')
-    entityDashboardHtml = testcaseDashboardsDir + "/" + role + "-dashboards.html"
-    for dashboard in dashboards:
-        # place the graph svg files in this dashboard
-        allGraphs = dashboard['graphs']
-        for graph in allGraphs:
-            attributes = map(lambda attribute: graph['bean_name'] + ':' + attribute, 
-                                           graph['attributes'].split(','))
-            for attribute in attributes:                
-                graphFileLocation = testcaseDashboardsDir + "/" + role + "/" + attribute + ".svg"
-                entityDashboard << embed(src = graphFileLocation, type = "image/svg+xml")
-    entityDashboard.printOut(entityDashboardHtml)
-    return entityDashboardHtml
-
-def start_metrics_collection(jmxHost, jmxPort, role, entityId, systemTestEnv, testcaseEnv):
-    logger.info("starting metrics collection on jmx port : " + jmxPort, extra=d)
-    jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi"
-    clusterConfig = systemTestEnv.clusterEntityConfigDictList
-    metricsDefinitionFile = systemTestEnv.METRICS_PATHNAME
-    entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "metrics")
-    dashboardsForRole = get_dashboard_definition(metricsDefinitionFile, role)
-    mbeansForRole = get_mbeans_for_role(dashboardsForRole)
-    
-    kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "kafka_home")
-    javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterConfig, "entity_id", entityId, "java_home")
-    
-    for mbean in mbeansForRole:
-        outputCsvFile = entityMetricsDir + "/" + mbean + ".csv"
-        startMetricsCmdList = ["ssh " + jmxHost,
-                               "'JAVA_HOME=" + javaHome,
-                               "JMX_PORT= " + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.JmxTool",
-                               "--jmx-url " + jmxUrl,
-                               "--object-name " + mbean + " 1> ",
-                                outputCsvFile + " & echo pid:$! > ",
-                                entityMetricsDir + "/entity_pid'"]
-
-        startMetricsCommand = " ".join(startMetricsCmdList) 
-        logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
-        system_test_utils.async_sys_call(startMetricsCommand)
-        time.sleep(1)
-
-        pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid' 2> /dev/null"
-        logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
-        subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
-
-        # keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid
-        # testcaseEnv.entityJmxParentPidDict:
-        #   key: entity_id
-        #   val: list of JMX ppid associated to that entity_id
-        #   { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
-        for line in subproc.stdout.readlines():
-            line = line.rstrip('\n')
-            logger.debug("line: [" + line + "]", extra=d)
-            if line.startswith("pid"):
-                logger.debug("found pid line: [" + line + "]", extra=d)
-                tokens  = line.split(':')
-                thisPid = tokens[1]
-                if entityId not in testcaseEnv.entityJmxParentPidDict:
-                    testcaseEnv.entityJmxParentPidDict[entityId] = []
-                testcaseEnv.entityJmxParentPidDict[entityId].append(thisPid)
-                #print "\n#### testcaseEnv.entityJmxParentPidDict ", testcaseEnv.entityJmxParentPidDict, "\n"
-
-
-def stop_metrics_collection(jmxHost, jmxPort):
-    logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)
-    system_test_utils.sys_call("ps -ef | grep JmxTool | grep -v grep | grep " + jmxPort + " | awk '{print $2}' | xargs kill -9")
-
-def get_mbeans_for_role(dashboardsForRole):
-    graphs = reduce(lambda x,y: x+y, map(lambda dashboard: dashboard['graphs'], dashboardsForRole))
-    return set(map(lambda metric: metric['bean_name'], graphs))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/pyh.py
----------------------------------------------------------------------
diff --git a/system_test/utils/pyh.py b/system_test/utils/pyh.py
deleted file mode 100644
index cff06f4..0000000
--- a/system_test/utils/pyh.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# @file: pyh.py
-# @purpose: a HTML tag generator
-# @author: Emmanuel Turlay <turlay@cern.ch>
-
-__doc__ = """The pyh.py module is the core of the PyH package. PyH lets you
-generate HTML tags from within your python code.
-See http://code.google.com/p/pyh/ for documentation.
-"""
-__author__ = "Emmanuel Turlay <turlay@cern.ch>"
-__version__ = '$Revision: 63 $'
-__date__ = '$Date: 2010-05-21 03:09:03 +0200 (Fri, 21 May 2010) $'
-
-from sys import _getframe, stdout, modules, version
-nOpen={}
-
-nl = '\n'
-doctype = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">\n'
-charset = '<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />\n'
-
-tags = ['html', 'body', 'head', 'link', 'meta', 'div', 'p', 'form', 'legend', 
-        'input', 'select', 'span', 'b', 'i', 'option', 'img', 'script',
-        'table', 'tr', 'td', 'th', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
-        'fieldset', 'a', 'title', 'body', 'head', 'title', 'script', 'br', 'table',
-        'ul', 'li', 'ol', 'embed']
-
-selfClose = ['input', 'img', 'link', 'br']
-
-class Tag(list):
-    tagname = ''
-    
-    def __init__(self, *arg, **kw):
-        self.attributes = kw
-        if self.tagname : 
-            name = self.tagname
-            self.isSeq = False
-        else: 
-            name = 'sequence'
-            self.isSeq = True
-        self.id = kw.get('id', name)
-        #self.extend(arg)
-        for a in arg: self.addObj(a)
-
-    def __iadd__(self, obj):
-        if isinstance(obj, Tag) and obj.isSeq:
-            for o in obj: self.addObj(o)
-        else: self.addObj(obj)
-        return self
-    
-    def addObj(self, obj):
-        if not isinstance(obj, Tag): obj = str(obj)
-        id=self.setID(obj)
-        setattr(self, id, obj)
-        self.append(obj)
-
-    def setID(self, obj):
-        if isinstance(obj, Tag):
-            id = obj.id
-            n = len([t for t in self if isinstance(t, Tag) and t.id.startswith(id)])
-        else:
-            id = 'content'
-            n = len([t for t in self if not isinstance(t, Tag)])
-        if n: id = '%s_%03i' % (id, n)
-        if isinstance(obj, Tag): obj.id = id
-        return id
-
-    def __add__(self, obj):
-        if self.tagname: return Tag(self, obj)
-        self.addObj(obj)
-        return self
-
-    def __lshift__(self, obj):
-        self += obj
-        if isinstance(obj, Tag): return obj
-
-    def render(self):
-        result = ''
-        if self.tagname:
-            result = '<%s%s%s>' % (self.tagname, self.renderAtt(), self.selfClose()*' /')
-        if not self.selfClose():
-            for c in self:
-                if isinstance(c, Tag):
-                    result += c.render()
-                else: result += c
-            if self.tagname: 
-                result += '</%s>' % self.tagname
-        result += '\n'
-        return result
-
-    def renderAtt(self):
-        result = ''
-        for n, v in self.attributes.iteritems():
-            if n != 'txt' and n != 'open':
-                if n == 'cl': n = 'class'
-                result += ' %s="%s"' % (n, v)
-        return result
-
-    def selfClose(self):
-        return self.tagname in selfClose        
-    
-def TagFactory(name):
-    class f(Tag):
-        tagname = name
-    f.__name__ = name
-    return f
-
-thisModule = modules[__name__]
-
-for t in tags: setattr(thisModule, t, TagFactory(t)) 
-
-def ValidW3C():
-    out = a(img(src='http://www.w3.org/Icons/valid-xhtml10', alt='Valid XHTML 1.0 Strict'), href='http://validator.w3.org/check?uri=referer')
-    return out
-
-class PyH(Tag):
-    tagname = 'html'
-    
-    def __init__(self, name='MyPyHPage'):
-        self += head()
-        self += body()
-        self.attributes = dict(xmlns='http://www.w3.org/1999/xhtml', lang='en')
-        self.head += title(name)
-
-    def __iadd__(self, obj):
-        if isinstance(obj, head) or isinstance(obj, body): self.addObj(obj)
-        elif isinstance(obj, meta) or isinstance(obj, link): self.head += obj
-        else:
-            self.body += obj
-            id=self.setID(obj)
-            setattr(self, id, obj)
-        return self
-
-    def addJS(self, *arg):
-        for f in arg: self.head += script(type='text/javascript', src=f)
-
-    def addCSS(self, *arg):
-        for f in arg: self.head += link(rel='stylesheet', type='text/css', href=f)
-    
-    def printOut(self,file=''):
-        if file: f = open(file, 'w')
-        else: f = stdout
-        f.write(doctype)
-        f.write(self.render())
-        f.flush()
-        if file: f.close()
-    

http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/replication_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/replication_utils.py b/system_test/utils/replication_utils.py
deleted file mode 100644
index cfd80b2..0000000
--- a/system_test/utils/replication_utils.py
+++ /dev/null
@@ -1,70 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#!/usr/bin/env python
-
-# =================================================================
-# replication_utils.py
-# - This module defines constant values specific to Kafka Replication
-#   and also provides helper functions for Replication system test.
-# =================================================================
-
-import logging
-import sys
-
-class ReplicationUtils(object):
-
-    thisClassName = '(ReplicationUtils)'
-    d = {'name_of_class': thisClassName}
-
-    logger     = logging.getLogger("namedLogger")
-    anonLogger = logging.getLogger("anonymousLogger")
-
-    def __init__(self, testClassInstance):
-        super(ReplicationUtils, self).__init__()
-        self.logger.debug("#### constructor inside ReplicationUtils", extra=self.d)
-
-        # leader attributes
-        self.isLeaderLogPattern             = "Completed the leader state transition"
-        self.brokerShutDownCompletedPattern = "shut down completed"
-
-        self.leaderAttributesDict = {}
-
-        self.leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = \
-            self.brokerShutDownCompletedPattern
-
-        self.leaderAttributesDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \
-            "\[(.*?)\] .* \[Kafka Server (.*?)\], " + \
-            self.brokerShutDownCompletedPattern
-
-        self.leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] = \
-            self.isLeaderLogPattern
-
-        self.leaderAttributesDict["REGX_LEADER_ELECTION_PATTERN"]  = \
-            "\[(.*?)\] .* Broker (.*?): " + \
-            self.leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + \
-            " for topic (.*?) partition (.*?) \(.*"
-
-        # Controller attributes
-        self.isControllerLogPattern    = "Controller startup complete"
-        self.controllerAttributesDict  = {}
-        self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"] = self.isControllerLogPattern
-        self.controllerAttributesDict["REGX_CONTROLLER_STARTUP_PATTERN"] = "\[(.*?)\] .* \[Controller (.*?)\]: " + \
-            self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"]
-
-        # Data Loss Percentage Threshold in Ack = 1 cases
-        self.ackOneDataLossThresholdPercent = 5.0
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/setup_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/setup_utils.py b/system_test/utils/setup_utils.py
deleted file mode 100644
index 0e8b7f9..0000000
--- a/system_test/utils/setup_utils.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#!/usr/bin/env python
-
-# =================================================================
-# setup_utils.py
-# - This module provides some basic helper functions.
-# =================================================================
-
-import logging
-import kafka_system_test_utils
-import sys
-
-class SetupUtils(object):
-
-    # dict to pass user-defined attributes to logger argument: "extra"
-    # to use: just update "thisClassName" to the appropriate value
-    thisClassName = '(ReplicaBasicTest)'
-    d = {'name_of_class': thisClassName}
-
-    logger     = logging.getLogger("namedLogger")
-    anonLogger = logging.getLogger("anonymousLogger")
-
-    def __init__(self):
-        d = {'name_of_class': self.__class__.__name__}
-        self.logger.debug("#### constructor inside SetupUtils", extra=self.d)
-
-    def log_message(self, message):
-        print
-        self.anonLogger.info("======================================================")
-        self.anonLogger.info(message)
-        self.anonLogger.info("======================================================")
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/system_test_utils.py b/system_test/utils/system_test_utils.py
deleted file mode 100644
index e8529cd..0000000
--- a/system_test/utils/system_test_utils.py
+++ /dev/null
@@ -1,638 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#!/usr/bin/env python
-
-# ===================================
-# system_test_utils.py
-# ===================================
-
-import copy
-import difflib
-import inspect
-import json
-import logging
-import os
-import re
-import signal
-import socket
-import subprocess
-import sys
-import time
-
-logger  = logging.getLogger("namedLogger")
-aLogger = logging.getLogger("anonymousLogger")
-thisClassName = '(system_test_utils)'
-d = {'name_of_class': thisClassName}
-
-
-def get_current_unix_timestamp():
-    ts = time.time()
-    return "{0:.6f}".format(ts)
-
-
-def get_local_hostname():
-    return socket.gethostname()
-
-
-def sys_call(cmdStr):
-    output = ""
-    #logger.info("executing command [" + cmdStr + "]", extra=d)
-    p = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-    for line in p.stdout.readlines():
-        output += line
-    return output
-
-
-def remote_async_sys_call(host, cmd):
-    cmdStr = "ssh " + host + " \"" + cmd + "\""
-    logger.info("executing command [" + cmdStr + "]", extra=d)
-    async_sys_call(cmdStr)
-
-
-def remote_sys_call(host, cmd):
-    cmdStr = "ssh " + host + " \"" + cmd + "\""
-    logger.info("executing command [" + cmdStr + "]", extra=d)
-    sys_call(cmdStr)
-
-
-def get_dir_paths_with_prefix(fullPath, dirNamePrefix):
-    dirsList = []
-    for dirName in os.listdir(fullPath):
-        if not os.path.isfile(dirName) and dirName.startswith(dirNamePrefix):
-            dirsList.append(os.path.abspath(fullPath + "/" + dirName))
-    return dirsList
-
-
-def get_testcase_prop_json_pathname(testcasePathName):
-    testcaseDirName = os.path.basename(testcasePathName)
-    return testcasePathName + "/" + testcaseDirName + "_properties.json"
-
-def get_json_list_data(infile):
-    json_file_str = open(infile, "r").read()
-    json_data     = json.loads(json_file_str)
-    data_list     = []
-
-    for key,settings in json_data.items():
-        if type(settings) == list:
-            for setting in settings:
-                if type(setting) == dict:
-                    kv_dict = {}
-                    for k,v in setting.items():
-                        kv_dict[k] = v
-                    data_list.append(kv_dict)
-
-    return data_list
-
-
-def get_dict_from_list_of_dicts(listOfDicts, lookupKey, lookupVal):
-    # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
-    # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
-    #
-    # Usage:
-    #
-    # 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role")
-    #    returns:
-    #        {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
-    #
-    # 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role")
-    #    returns:
-    #        {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
-    #        {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
-
-    retList = []
-    if ( lookupVal is None or lookupKey is None ):
-        for dict in listOfDicts:
-            for k,v in dict.items():
-                if ( k == fieldToRetrieve ):               # match with fieldToRetrieve ONLY
-                    retList.append( dict )
-    else:
-        for dict in listOfDicts:
-            for k,v in dict.items():
-                if ( k == lookupKey and v == lookupVal ):  # match with lookupKey and lookupVal
-                    retList.append( dict )
-
-    return retList
-
-
-def get_data_from_list_of_dicts(listOfDicts, lookupKey, lookupVal, fieldToRetrieve):
-    # Sample List of Dicts:
-    # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '0', 'role': 'zookeeper', 'hostname': 'localhost'}
-    # {'kafka_home': '/mnt/u001/kafka_0.8_sanity', 'entity_id': '1', 'role': 'broker', 'hostname': 'localhost'}
-    #
-    # Usage:
-    # 1. get_data_from_list_of_dicts(self.clusterConfigsList, "entity_id", "0", "role")
-    #    => returns ['zookeeper']
-    # 2. get_data_from_list_of_dicts(self.clusterConfigsList, None, None, "role")
-    #    => returns ['zookeeper', 'broker']
-
-    retList = []
-    if ( lookupVal is None or lookupKey is None ):
-        for dict in listOfDicts:
-            for k,v in dict.items():
-                if ( k == fieldToRetrieve ):               # match with fieldToRetrieve ONLY
-                    try:
-                        retList.append( dict[fieldToRetrieve] )
-                    except:
-                        logger.debug("field not found: " + fieldToRetrieve, extra=d)
-    else:
-        for dict in listOfDicts:
-            for k,v in dict.items():
-                if ( k == lookupKey and v == lookupVal ):  # match with lookupKey and lookupVal
-                    try:
-                        retList.append( dict[fieldToRetrieve] )
-                    except:
-                        logger.debug("field not found: " + fieldToRetrieve, extra=d)
-    return retList
-
-def get_data_by_lookup_keyval(listOfDict, lookupKey, lookupVal, fieldToRetrieve):
-    returnValue = ""
-    returnValuesList = get_data_from_list_of_dicts(listOfDict, lookupKey, lookupVal, fieldToRetrieve)
-    if len(returnValuesList) > 0:
-        returnValue = returnValuesList[0]
-
-    return returnValue
-
-def get_json_dict_data(infile):
-    json_file_str = open(infile, "r").read()
-    json_data     = json.loads(json_file_str)
-    data_dict     = {}
-
-    for key,val in json_data.items():
-        if ( type(val) != list ): 
-            data_dict[key] = val
-
-    return data_dict
-
-def get_remote_child_processes(hostname, pid):
-    pidStack = []
-
-    cmdList = ['''ssh ''' + hostname,
-              ''''pid=''' + pid + '''; prev_pid=""; echo $pid;''',
-              '''while [[ "x$pid" != "x" ]];''',
-              '''do prev_pid=$pid;''',
-              '''  for child in $(ps -o pid,ppid ax | awk "{ if ( \$2 == $pid ) { print \$1 }}");''',
-              '''    do echo $child; pid=$child;''',
-              '''  done;''',
-              '''  if [ $prev_pid == $pid ]; then''',
-              '''    break;''',
-              '''  fi;''',
-              '''done' 2> /dev/null''']
-
-    cmdStr = " ".join(cmdList)
-    logger.debug("executing command [" + cmdStr, extra=d)
-
-    subproc = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE)
-    for line in subproc.stdout.readlines():
-        procId = line.rstrip('\n')
-        pidStack.append(procId)
-    return pidStack
-
-def get_child_processes(pid):
-    pidStack   = []
-    currentPid = pid
-    parentPid  = ""
-    pidStack.append(pid)
-
-    while ( len(currentPid) > 0 ):
-        psCommand = subprocess.Popen("ps -o pid --ppid %s --noheaders" % currentPid, shell=True, stdout=subprocess.PIPE)
-        psOutput  = psCommand.stdout.read()
-        outputLine = psOutput.rstrip('\n')
-        childPid   = outputLine.lstrip()
-
-        if ( len(childPid) > 0 ):
-            pidStack.append(childPid)
-            currentPid = childPid
-        else:
-            break
-    return pidStack
-
-def sigterm_remote_process(hostname, pidStack):
-
-    while ( len(pidStack) > 0 ):
-        pid = pidStack.pop()
-        cmdStr = "ssh " + hostname + " 'kill -15 " + pid + "'"
-
-        try:
-            logger.debug("executing command [" + cmdStr + "]", extra=d)
-            sys_call_return_subproc(cmdStr)
-        except:
-            print "WARN - pid:",pid,"not found"
-            raise
-
-def sigkill_remote_process(hostname, pidStack):
-
-    while ( len(pidStack) > 0 ):
-        pid = pidStack.pop()
-        cmdStr = "ssh " + hostname + " 'kill -9 " + pid + "'"
-
-        try:
-            logger.debug("executing command [" + cmdStr + "]", extra=d)
-            sys_call_return_subproc(cmdStr)
-        except:
-            print "WARN - pid:",pid,"not found"
-            raise
-
-def simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, pauseTimeInSeconds):
-    pausedPidStack = []
-
-    # pause the processes
-    while len(pidStack) > 0:
-        pid = pidStack.pop()
-        pausedPidStack.append(pid)
-        cmdStr = "ssh " + hostname + " 'kill -SIGSTOP " + pid + "'"
-
-        try:
-            logger.debug("executing command [" + cmdStr + "]", extra=d)
-            sys_call_return_subproc(cmdStr)
-        except:
-            print "WARN - pid:",pid,"not found"
-            raise
-
-    time.sleep(int(pauseTimeInSeconds))
-
-    # resume execution of the processes
-    while len(pausedPidStack) > 0:
-        pid = pausedPidStack.pop()
-        cmdStr = "ssh " + hostname + " 'kill -SIGCONT " + pid + "'"
-
-        try:
-            logger.debug("executing command [" + cmdStr + "]", extra=d)
-            sys_call_return_subproc(cmdStr)
-        except:
-            print "WARN - pid:",pid,"not found"
-            raise
-
-def terminate_process(pidStack):
-    while ( len(pidStack) > 0 ):
-        pid = pidStack.pop()
-        try:
-            os.kill(int(pid), signal.SIGTERM)
-        except:
-            print "WARN - pid:",pid,"not found"
-            raise
-
-
-def convert_keyval_to_cmd_args(configFilePathname):
-    cmdArg  = ""
-    inlines = open(configFilePathname, "r").readlines()
-    for inline in inlines:
-        line = inline.rstrip()
-        tokens = line.split('=', 1)
-
-        if (len(tokens) == 2):
-            cmdArg = cmdArg + " --" + tokens[0] + " " + tokens[1]
-        elif (len(tokens) == 1):
-            cmdArg = cmdArg + " --" + tokens[0]
-        else:
-            print "ERROR: unexpected arguments list", line
-    return cmdArg
-
-
-def async_sys_call(cmd_str):
-    subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-
-
-def sys_call_return_subproc(cmd_str):
-    p = subprocess.Popen(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-    return p
-
-
-def remote_host_file_exists(hostname, pathname):
-    cmdStr = "ssh " + hostname + " 'ls " + pathname + "'"
-    logger.debug("executing command: [" + cmdStr + "]", extra=d)
-    subproc = sys_call_return_subproc(cmdStr)
-
-    for line in subproc.stdout.readlines():
-        if "No such file or directory" in line:
-            return False
-    return True
-
-
-def remote_host_directory_exists(hostname, path):
-    cmdStr = "ssh " + hostname + " 'ls -d " + path + "'"
-    logger.debug("executing command: [" + cmdStr + "]", extra=d)
-    subproc = sys_call_return_subproc(cmdStr)
-
-    for line in subproc.stdout.readlines():
-        if "No such file or directory" in line:
-            return False
-    return True
-
-
-def remote_host_processes_stopped(hostname):
-    cmdStr = "ssh " + hostname + \
-             " \"ps auxw | grep -v grep | grep -v Bootstrap | grep -i 'java\|run\-\|producer\|consumer\|jmxtool\|kafka' | wc -l\" 2> /dev/null"
-
-    logger.info("executing command: [" + cmdStr + "]", extra=d)
-    subproc = sys_call_return_subproc(cmdStr)
-
-    for line in subproc.stdout.readlines():
-        line = line.rstrip('\n')
-        logger.info("no. of running processes found : [" + line + "]", extra=d)
-        if line == '0':
-            return True
-    return False
-
-
-def setup_remote_hosts(systemTestEnv):
-    # sanity check on remote hosts to make sure:
-    # - all directories (eg. java_home) specified in cluster_config.json exists in all hosts
-    # - no conflicting running processes in remote hosts
-
-    aLogger.info("=================================================")
-    aLogger.info("setting up remote hosts ...")
-    aLogger.info("=================================================")
-
-    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-
-    localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
-
-    # when configuring "default" java_home, use JAVA_HOME environment variable, if exists
-    # otherwise, use the directory with the java binary
-    localJavaHome  = os.environ.get('JAVA_HOME')
-    if localJavaHome is not None:
-        localJavaBin   = localJavaHome + '/bin/java'
-    else:
-        subproc = sys_call_return_subproc("which java")
-        for line in subproc.stdout.readlines():
-            if line.startswith("which: no "):
-                logger.error("No Java binary found in local host", extra=d)
-                return False
-            else:
-                line = line.rstrip('\n')
-                localJavaBin = line
-                matchObj = re.match("(.*)\/bin\/java$", line)
-                localJavaHome = matchObj.group(1)
-
-    listIndex = -1
-    for clusterEntityConfigDict in clusterEntityConfigDictList:
-        listIndex += 1
-
-        hostname  = clusterEntityConfigDict["hostname"]
-        kafkaHome = clusterEntityConfigDict["kafka_home"]
-        javaHome  = clusterEntityConfigDict["java_home"]
-
-        if hostname == "localhost" and javaHome == "default":
-            clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome
-
-        if hostname == "localhost" and kafkaHome == "default":
-            clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome
-        if hostname == "localhost" and kafkaHome == "system_test/migration_tool_testsuite/0.7":
-            clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome + "/system_test/migration_tool_testsuite/0.7"
-
-        kafkaHome = clusterEntityConfigDict["kafka_home"]
-        javaHome  = clusterEntityConfigDict["java_home"]
-
-        logger.debug("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d)
-        if not remote_host_directory_exists(hostname, javaHome):
-            logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d)
-            return False
-
-        logger.debug("checking directory [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
-        if not remote_host_directory_exists(hostname, kafkaHome):
-            logger.info("Directory not found: [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
-            if hostname == "localhost":
-                return False
-            else:
-                localKafkaSourcePath = systemTestEnv.SYSTEM_TEST_BASE_DIR + "/.."
-                logger.debug("copying local copy of [" + localKafkaSourcePath + "] to " + hostname + ":" + kafkaHome, extra=d)
-                copy_source_to_remote_hosts(hostname, localKafkaSourcePath, kafkaHome)
-
-    return True
-
-def copy_source_to_remote_hosts(hostname, sourceDir, destDir):
-
-    cmdStr = "rsync -avz --delete-before " + sourceDir + "/ " + hostname + ":" + destDir
-    logger.info("executing command [" + cmdStr + "]", extra=d)
-    subproc = sys_call_return_subproc(cmdStr)
-
-    for line in subproc.stdout.readlines():
-        dummyVar = 1
-
-
-def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome):
-
-    if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
-        cmdStr  = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'"
-        logger.info("executing command [" + cmdStr + "]", extra=d)
-        sys_call(cmdStr)
-
-        cmdStr  = "ssh " + hostname + " 'rm -rf " + kafkaHome + "'"
-        logger.info("executing command [" + cmdStr + "]", extra=d)
-        #sys_call(cmdStr)
-    else:
-        logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
-        logger.warn("check config file: system_test/cluster_config.properties", extra=d)
-        logger.warn("aborting test...", extra=d)
-        sys.exit(1)
-
-def get_md5_for_file(filePathName, blockSize=8192):
-    md5 = hashlib.md5()
-    f   = open(filePathName, 'rb')
-
-    while True:
-        data = f.read(blockSize)
-        if not data:
-            break
-        md5.update(data)
-    return md5.digest()
-
-def load_cluster_config(clusterConfigPathName, clusterEntityConfigDictList):
-    # empty the list
-    clusterEntityConfigDictList[:] = []
-
-    # retrieve each entity's data from cluster config json file
-    # as "dict" and enter them into a "list"
-    jsonFileContent = open(clusterConfigPathName, "r").read()
-    jsonData        = json.loads(jsonFileContent)
-    for key, cfgList in jsonData.items():
-        if key == "cluster_config":
-            for cfg in cfgList:
-                clusterEntityConfigDictList.append(cfg)
-
-def setup_remote_hosts_with_testcase_level_cluster_config(systemTestEnv, testCasePathName):
-    # =======================================================================
-    # starting a new testcase, check for local cluster_config.json
-    # =======================================================================
-    # 1. if there is a xxxx_testsuite/testcase_xxxx/cluster_config.json
-    #    => load it into systemTestEnv.clusterEntityConfigDictList
-    # 2. if there is NO testcase_xxxx/cluster_config.json but has a xxxx_testsuite/cluster_config.json
-    #    => retore systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite
-    # 3. if there is NO testcase_xxxx/cluster_config.json NOR xxxx_testsuite/cluster_config.json
-    #    => restore system_test/cluster_config.json
-
-    testCaseLevelClusterConfigPathName = testCasePathName + "/cluster_config.json"
-
-    if os.path.isfile(testCaseLevelClusterConfigPathName):
-        # if there is a cluster_config.json in this directory, load it and use it for this testsuite
-        logger.info("found a new cluster_config : " + testCaseLevelClusterConfigPathName, extra=d)
-
-        # empty the current cluster config list
-        systemTestEnv.clusterEntityConfigDictList[:] = []
-
-        # load the cluster config for this testcase level
-        load_cluster_config(testCaseLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList)
-
-        # back up this testcase level cluster config
-        systemTestEnv.clusterEntityConfigDictListLastFoundInTestCase = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList)
-
-    elif len(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite) > 0:
-        # if there is NO testcase_xxxx/cluster_config.json, but has a xxxx_testsuite/cluster_config.json
-        # => restore the config in xxxx_testsuite/cluster_config.json
-
-        # empty the current cluster config list
-        systemTestEnv.clusterEntityConfigDictList[:] = []
-
-        # restore the system_test/cluster_config.json
-        systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite)
-
-    else:
-        # if there is NONE, restore the config in system_test/cluster_config.json
-
-        # empty the current cluster config list
-        systemTestEnv.clusterEntityConfigDictList[:] = []
-
-        # restore the system_test/cluster_config.json
-        systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel)
-
-    # set up remote hosts
-    if not setup_remote_hosts(systemTestEnv):
-        logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
-        print
-        sys.exit(1)
-    print
-
-def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName):
-    # =======================================================================
-    # starting a new testsuite, check for local cluster_config.json:
-    # =======================================================================
-    # 1. if there is a xxxx_testsuite/cluster_config.son
-    #    => load it into systemTestEnv.clusterEntityConfigDictList
-    # 2. if there is NO xxxx_testsuite/cluster_config.son
-    #    => restore system_test/cluster_config.json
-
-    testSuiteLevelClusterConfigPathName = testModulePathName + "/cluster_config.json"
-
-    if os.path.isfile(testSuiteLevelClusterConfigPathName):
-        # if there is a cluster_config.json in this directory, load it and use it for this testsuite
-        logger.info("found a new cluster_config : " + testSuiteLevelClusterConfigPathName, extra=d)
-
-        # empty the current cluster config list
-        systemTestEnv.clusterEntityConfigDictList[:] = []
-
-        # load the cluster config for this testsuite level
-        load_cluster_config(testSuiteLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList)
-
-        # back up this testsuite level cluster config
-        systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList)
-
-    else:
-        # if there is NONE, restore the config in system_test/cluster_config.json
-
-        # empty the last testsuite level cluster config list
-        systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite[:] = []
-
-        # empty the current cluster config list
-        systemTestEnv.clusterEntityConfigDictList[:] = []
-
-        # restore the system_test/cluster_config.json
-        systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel)
-
-    # set up remote hosts
-    if not setup_remote_hosts(systemTestEnv):
-        logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
-        print
-        sys.exit(1)
-    print
-
-# =================================================
-# lists_diff_count
-# - find the no. of different items in both lists
-# - both lists need not be sorted
-# - input lists won't be changed
-# =================================================
-def lists_diff_count(a, b):
-    c = list(b)
-    d = []
-    for item in a:
-        try:
-            c.remove(item)
-        except:
-            d.append(item)
-
-    if len(d) > 0:
-        print "#### Mismatch MessageID"
-        print d
-
-    return len(c) + len(d)
-
-# =================================================
-# subtract_list
-# - subtract items in listToSubtract from mainList
-#   and return the resulting list
-# - both lists need not be sorted
-# - input lists won't be changed
-# =================================================
-def subtract_list(mainList, listToSubtract):
-    remainingList = list(mainList)
-    for item in listToSubtract:
-        try:
-            remainingList.remove(item)
-        except:
-            pass
-    return remainingList
-
-# =================================================
-# diff_lists
-# - find the diff of 2 lists and return the 
-#   total no. of mismatch from both lists
-# - diff of both lists includes:
-#   - no. of items mismatch
-#   - ordering of the items
-#
-# sample lists:
-# a = ['8','4','3','2','1']
-# b = ['8','3','4','2','1']
-#
-# difflib will return the following:
-#   8
-# + 3
-#   4
-# - 3
-#   2
-#   1
-#
-# diff_lists(a,b) returns 2 and prints the following:
-# #### only in seq 2 :  + 3
-# #### only in seq 1 :  - 3
-# =================================================
-def diff_lists(a, b):
-    mismatchCount = 0
-    d = difflib.Differ()
-    diff = d.compare(a,b)
-
-    for item in diff:
-        result = item[0:1].strip()
-        if len(result) > 0:
-            mismatchCount += 1
-            if '-' in result:
-                logger.debug("#### only in seq 1 : " + item, extra=d)
-            elif '+' in result:
-                logger.debug("#### only in seq 2 : " + item, extra=d)
-
-    return mismatchCount
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/d50499a0/system_test/utils/testcase_env.py
----------------------------------------------------------------------
diff --git a/system_test/utils/testcase_env.py b/system_test/utils/testcase_env.py
deleted file mode 100644
index 1d2fb57..0000000
--- a/system_test/utils/testcase_env.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#!/usr/bin/env python
-
-# ===================================
-# testcase_env.py
-# ===================================
-
-import json
-import os
-import sys
-import thread
-
-import system_test_utils
-
-class TestcaseEnv():
-    def __init__(self, systemTestEnv, classInstance):
-        self.systemTestEnv    = systemTestEnv
-
-        # ================================
-        # Generic testcase environment
-        # ================================
-
-        # dictionary of entity_id to ppid for Zookeeper entities
-        # key: entity_id
-        # val: ppid of Zookeeper associated to that entity_id
-        # { 0: 12345, 1: 12389, ... }
-        self.entityZkParentPidDict = {}
-
-        # dictionary of entity_id to ppid for broker entities
-        # key: entity_id
-        # val: ppid of broker associated to that entity_id
-        # { 0: 12345, 1: 12389, ... }
-        self.entityBrokerParentPidDict = {}
-
-        # dictionary of entity_id to ppid for mirror-maker entities
-        # key: entity_id
-        # val: ppid of broker associated to that entity_id
-        # { 0: 12345, 1: 12389, ... }
-        self.entityMirrorMakerParentPidDict = {}
-
-        # dictionary of entity_id to ppid for console-consumer entities
-        # key: entity_id
-        # val: ppid of console consumer associated to that entity_id
-        # { 0: 12345, 1: 12389, ... }
-        self.entityConsoleConsumerParentPidDict = {}
-
-        # dictionary of entity_id to ppid for migration tool entities
-        # key: entity_id
-        # val: ppid of broker associated to that entity_id
-        # { 0: 12345, 1: 12389, ... }
-        self.entityMigrationToolParentPidDict = {}
-
-        # dictionary of entity_id to list of JMX ppid
-        # key: entity_id
-        # val: list of JMX ppid associated to that entity_id
-        # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
-        self.entityJmxParentPidDict = {}
-
-        # dictionary of hostname-topic-ppid for consumer
-        # key: hostname
-        # val: dict of topic-ppid
-        # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
-        self.consumerHostParentPidDict = {}
-
-        # dictionary of hostname-topic-ppid for producer
-        # key: hostname
-        # val: dict of topic-ppid
-        # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
-        self.producerHostParentPidDict = {}
-
-        # list of testcase configs
-        self.testcaseConfigsList = []
-
-        # dictionary to keep track of testcase arguments such as replica_factor, num_partition
-        self.testcaseArgumentsDict = {}
-
-
-        # gather the test case related info and add to an SystemTestEnv object
-        self.testcaseResultsDict = {}
-        self.testcaseResultsDict["_test_class_name"] = classInstance.__class__.__name__
-        self.testcaseResultsDict["_test_case_name"]  = ""
-        self.validationStatusDict                      = {}
-        self.testcaseResultsDict["validation_status"]  = self.validationStatusDict
-        self.systemTestEnv.systemTestResultsList.append(self.testcaseResultsDict)
-
-        # FIXME: in a distributed environement, kafkaBaseDir could be different in individual host
-        #        => TBD
-        self.kafkaBaseDir      = ""
-
-        self.systemTestBaseDir = systemTestEnv.SYSTEM_TEST_BASE_DIR
-
-        # to be initialized in the Test Module
-        self.testSuiteBaseDir      = ""
-        self.testCaseBaseDir       = ""
-        self.testCaseLogsDir       = ""
-        self.testCaseDashboardsDir = ""
-        self.testcasePropJsonPathName = ""
-        self.testcaseNonEntityDataDict = {}
-
-        # ================================
-        # dictionary to keep track of
-        # user-defined environment variables
-        # ================================
-        # LEADER_ELECTION_COMPLETED_MSG = "completed the leader state transition"
-        # REGX_LEADER_ELECTION_PATTERN  = "\[(.*?)\] .* Broker (.*?) " + \
-        #                            LEADER_ELECTION_COMPLETED_MSG + \
-        #                            " for topic (.*?) partition (.*?) \(.*"
-        # zkConnectStr = ""
-        # consumerLogPathName    = ""
-        # consumerConfigPathName = ""
-        # producerLogPathName    = ""
-        # producerConfigPathName = ""
-        self.userDefinedEnvVarDict = {}
-
-        # Lock object for producer threads synchronization
-        self.lock = thread.allocate_lock()
-
-        self.numProducerThreadsRunning = 0
-
-        # to be used when validating data match - these variables will be
-        # updated by kafka_system_test_utils.start_producer_in_thread
-        self.producerTopicsString = ""
-        self.consumerTopicsString = ""
-
-    def initWithKnownTestCasePathName(self, testCasePathName):
-        testcaseDirName = os.path.basename(testCasePathName)
-        self.testcaseResultsDict["_test_case_name"] = testcaseDirName
-        self.testCaseBaseDir = testCasePathName
-        self.testCaseLogsDir = self.testCaseBaseDir + "/logs"
-        self.testCaseDashboardsDir = self.testCaseBaseDir + "/dashboards"
-
-        # find testcase properties json file
-        self.testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName)
-
-        # get the dictionary that contains the testcase arguments and description
-        self.testcaseNonEntityDataDict = system_test_utils.get_json_dict_data(self.testcasePropJsonPathName)
-
-    def printTestCaseDescription(self, testcaseDirName):
-        testcaseDescription = ""
-        for k,v in self.testcaseNonEntityDataDict.items():
-            if ( k == "description" ):
-                testcaseDescription = v
-
-        print "\n"
-        print "======================================================================================="
-        print "Test Case Name :", testcaseDirName
-        print "======================================================================================="
-        print "Description    :"
-        for step in sorted(testcaseDescription.iterkeys()):
-            print "   ", step, ":", testcaseDescription[step]
-        print "======================================================================================="
-        print "Test Case Args :"
-        for k,v in self.testcaseArgumentsDict.items():
-            print "   ", k, " : ", v
-            self.testcaseResultsDict["arg : " + k] = v
-        print "======================================================================================="
-
-


Mime
View raw message