kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1396687 [4/4] - in /incubator/kafka/branches/0.8/system_test: ./ mirror_maker_testsuite/ mirror_maker_testsuite/config/ mirror_maker_testsuite/testcase_5001/ replication_testsuite/ replication_testsuite/config/ replication_testsuite/testca...
Date Wed, 10 Oct 2012 16:57:01 GMT
Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Wed Oct 10 16:56:57 2012
@@ -26,6 +26,7 @@ import inspect
 import json
 import logging
 import os
+import pprint
 import re
 import subprocess
 import sys
@@ -199,25 +200,30 @@ def init_entity_props(systemTestEnv, tes
     testcaseConfigsList = testcaseEnv.testcaseConfigsList
     testcasePathName    = testcaseEnv.testCaseBaseDir
 
-    # consumer config / log files location
-    consEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
+    try:
+        # consumer config / log files location
+        consEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
                              clusterConfigsList, "role", "console_consumer", "entity_id")
-    consLogList        = system_test_utils.get_data_from_list_of_dicts( \
+        consLogList        = system_test_utils.get_data_from_list_of_dicts( \
                              testcaseConfigsList, "entity_id", consEntityIdList[0], "log_filename")
-    consLogPathname    = testcasePathName + "/logs/" + consLogList[0]
-    consCfgList        = system_test_utils.get_data_from_list_of_dicts( \
+        consLogPathname    = testcasePathName + "/logs/" + consLogList[0]
+        consCfgList        = system_test_utils.get_data_from_list_of_dicts( \
                              testcaseConfigsList, "entity_id", consEntityIdList[0], "config_filename")
-    consCfgPathname    = testcasePathName + "/config/" + consCfgList[0]
+        consCfgPathname    = testcasePathName + "/config/" + consCfgList[0]
 
-    # producer config / log files location
-    prodEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
+        # producer config / log files location
+        prodEntityIdList   = system_test_utils.get_data_from_list_of_dicts( \
                              clusterConfigsList, "role", "producer_performance", "entity_id")
-    prodLogList        = system_test_utils.get_data_from_list_of_dicts( \
+        prodLogList        = system_test_utils.get_data_from_list_of_dicts( \
                              testcaseConfigsList, "entity_id", prodEntityIdList[0], "log_filename")
-    prodLogPathname    = testcasePathName + "/logs/" + prodLogList[0]
-    prodCfgList        = system_test_utils.get_data_from_list_of_dicts( \
+        prodLogPathname    = testcasePathName + "/logs/" + prodLogList[0]
+        prodCfgList        = system_test_utils.get_data_from_list_of_dicts( \
                              testcaseConfigsList, "entity_id", prodEntityIdList[0], "config_filename")
-    prodCfgPathname    = testcasePathName + "/config/" + prodCfgList[0]
+        prodCfgPathname    = testcasePathName + "/config/" + prodCfgList[0]
+    except:
+        logger.error("Failed to initialize entity config/log path names: possibly mismatched " \
+                    + "number of entities in cluster_config.json & testcase_n_properties.json", extra=d)
+        raise
 
     testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"]    = consLogPathname
     testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"] = consCfgPathname
@@ -225,7 +231,7 @@ def init_entity_props(systemTestEnv, tes
     testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"] = prodCfgPathname
 
 
-def copy_file_with_dict_values(srcFile, destFile, dictObj):
+def copy_file_with_dict_values(srcFile, destFile, dictObj, keyValToAddDict):
     infile  = open(srcFile, "r")
     inlines = infile.readlines()
     infile.close()
@@ -236,8 +242,13 @@ def copy_file_with_dict_values(srcFile, 
             if (line.startswith(key + "=")):
                 line = key + "=" + dictObj[key] + "\n"
         outfile.write(line)
-    outfile.close()
 
+    if (keyValToAddDict is not None):
+        for key in sorted(keyValToAddDict.iterkeys()):
+            line = key + "=" + keyValToAddDict[key] + "\n"
+            outfile.write(line)
+
+    outfile.close()
 
 def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
     logger.info("calling generate_properties_files", extra=d)
@@ -253,50 +264,135 @@ def generate_overriden_props_files(tests
 
     # loop through all zookeepers (if more than 1) to retrieve host and clientPort
     # to construct a zk.connect str for broker in the form of:
-    # zk.connect=<host1>:<port2>,<host2>:<port2>
-    zkConnectStr = ""
+    # zk.connect=<host1>:<port1>,<host2>:<port2>,...
+    testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]        = ""
+    testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]        = ""
+    testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"]      = []
+    testcaseEnv.userDefinedEnvVarDict["targetZkEntityIdList"]      = []
+    testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"]      = {}
+    testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"]      = {}
+    testcaseEnv.userDefinedEnvVarDict["sourceBrokerEntityIdList"]  = []
+    testcaseEnv.userDefinedEnvVarDict["targetBrokerEntityIdList"]  = []
+    testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]          = ""
+    testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]          = ""
+
+    # update zookeeper cluster info into "testcaseEnv.userDefinedEnvVarDict"
     zkDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "zookeeper")
+
     for zkDict in zkDictList:
         entityID       = zkDict["entity_id"]
         hostname       = zkDict["hostname"]
+        clusterName    = zkDict["cluster_name"]
         clientPortList = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "clientPort")
         clientPort     = clientPortList[0]
 
-        if ( zkConnectStr.__len__() == 0 ):
-            zkConnectStr = hostname + ":" + clientPort
+        if clusterName == "source":
+            # update source cluster zookeeper entities
+            testcaseEnv.userDefinedEnvVarDict["sourceZkEntityIdList"].append(entityID)
+            if ( len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) == 0 ):
+                testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] = hostname + ":" + clientPort
+            else:
+                testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] += "," + hostname + ":" + clientPort
+
+            # generate these strings for zookeeper config:
+            # server.1=host1:2180:2182
+            # server.2=host2:2180:2182
+            zkClusterSize = len(testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"])
+            zkClusterId   = str(zkClusterSize + 1)
+            key           = "server." + zkClusterId
+            val           = hostname + ":" + str(int(clientPort) - 1) + ":" + str(int(clientPort) + 1)
+            testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"][key] = val
+
+        elif clusterName == "target":
+            # update target cluster zookeeper entities
+            testcaseEnv.userDefinedEnvVarDict["targetZkEntityIdList"].append(entityID)
+            if ( len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) == 0 ):
+                testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] = hostname + ":" + clientPort
+            else:
+                testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] += "," + hostname + ":" + clientPort
+
+            # generate these strings for zookeeper config:
+            # server.1=host1:2180:2182
+            # server.2=host2:2180:2182
+            zkClusterSize = len(testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"])
+            zkClusterId   = str(zkClusterSize + 1)
+            key           = "server." + zkClusterId
+            val           = hostname + ":" + str(int(clientPort) - 1) + ":" + str(int(clientPort) + 1)
+            testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"][key] = val
+
         else:
-            zkConnectStr = zkConnectStr + "," + hostname + ":" + clientPort
+            logger.error("Unknown cluster name: " + clusterName)
+            sys.exit(1)
+
+    # update broker cluster info into "testcaseEnv.userDefinedEnvVarDict"
+    brokerDictList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", "broker")
+    for brokerDict in brokerDictList:
+        entityID       = brokerDict["entity_id"]
+        hostname       = brokerDict["hostname"]
+        clusterName    = brokerDict["cluster_name"]
+        portList       = system_test_utils.get_data_from_list_of_dicts(tcConfigsList, "entity_id", entityID, "port")
+        port           = portList[0]
+
+        if clusterName == "source":
+            if ( len(testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]) == 0 ):
+                testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] = hostname + ":" + port
+            else:
+                testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"] += "," + hostname + ":" + port
+        elif clusterName == "target":
+            if ( len(testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]) == 0 ):
+                testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] = hostname + ":" + port
+            else:
+                testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] += "," + hostname + ":" + port
+        else:
+            logger.error("Unknown cluster name: " + clusterName)
+            sys.exit(1)
 
     # for each entity in the cluster config
     for clusterCfg in clusterConfigsList:
         cl_entity_id = clusterCfg["entity_id"]
 
+        # loop through testcase config list 'tcConfigsList' for a matching cluster entity_id
         for tcCfg in tcConfigsList:
             if (tcCfg["entity_id"] == cl_entity_id):
 
                 # copy the associated .properties template, update values, write to testcase_<xxx>/config
 
                 if ( clusterCfg["role"] == "broker" ):
-                    tcCfg["zk.connect"] = zkConnectStr
-                    copy_file_with_dict_values(cfgTemplatePathname + "/server.properties", \
-                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
+                    if clusterCfg["cluster_name"] == "source":
+                        tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+                    elif clusterCfg["cluster_name"] == "target":
+                        tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
+                    else:
+                        logger.error("Unknown cluster name: " + clusterName)
+                        sys.exit(1)
 
-                elif ( clusterCfg["role"] == "zookeeper"):
-                    copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties", \
-                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
+                    copy_file_with_dict_values(cfgTemplatePathname + "/server.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
 
-                elif ( clusterCfg["role"] == "producer_performance"):
-                    #tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
-                    copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \
-                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
-
-                elif ( clusterCfg["role"] == "console_consumer"):
-                    tcCfg["zookeeper"] = zkConnectStr
-                    copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties", \
-                                               cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
+                elif ( clusterCfg["role"] == "zookeeper"):
+                    if clusterCfg["cluster_name"] == "source":
+                        copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties",
+                            cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg,
+                            testcaseEnv.userDefinedEnvVarDict["sourceZkHostPortDict"])
+                    elif clusterCfg["cluster_name"] == "target":
+                        copy_file_with_dict_values(cfgTemplatePathname + "/zookeeper.properties",
+                            cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg,
+                            testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"])
+                    else:
+                        logger.error("Unknown cluster name: " + clusterName)
+                        sys.exit(1)
+
+                elif ( clusterCfg["role"] == "mirror_maker"):
+                    tcCfg["broker.list"] = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
+                    copy_file_with_dict_values(cfgTemplatePathname + "/mirror_producer.properties",
+                        cfgDestPathname + "/" + tcCfg["mirror_producer_config_filename"], tcCfg, None)
+
+                    # update zk.connect with the zk entities specified in cluster_config.json
+                    tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+                    copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties",
+                        cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None)
                 else:
-                    print "    => ", tcCfg
-                    print "UNHANDLED key"
+                    logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d)
 
     # scp updated config files to remote hosts
     scp_file_to_remote_host(clusterConfigsList, testcaseEnv)
@@ -318,24 +414,63 @@ def scp_file_to_remote_host(clusterEntit
 def start_zookeepers(systemTestEnv, testcaseEnv):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
-    zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
-                         clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
+    zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+        clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
 
     for zkEntityId in zkEntityIdList:
-        start_entity_in_background(systemTestEnv, testcaseEnv, zkEntityId)
+        configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "zookeeper", zkEntityId, "config")
+        configFile     = system_test_utils.get_data_by_lookup_keyval(
+                             testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "config_filename")
+        clientPort     = system_test_utils.get_data_by_lookup_keyval(
+                             testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "clientPort")
+        dataDir        = system_test_utils.get_data_by_lookup_keyval(
+                             testcaseEnv.testcaseConfigsList, "entity_id", zkEntityId, "dataDir")
+        hostname       = system_test_utils.get_data_by_lookup_keyval(
+                             clusterEntityConfigDictList, "entity_id", zkEntityId, "hostname")
+        minusOnePort   = str(int(clientPort) - 1)
+        plusOnePort    = str(int(clientPort) + 1)
+
+        # read configFile to find out the id of the zk and create the file "myid"
+        infile  = open(configPathName + "/" + configFile, "r")
+        inlines = infile.readlines()
+        infile.close()
+
+        for line in inlines:
+            if line.startswith("server.") and hostname + ":" + minusOnePort + ":" + plusOnePort in line:
+                # server.1=host1:2187:2189
+                matchObj    = re.match("server\.(.*?)=.*", line)
+                zkServerId  = matchObj.group(1)
 
+        cmdStr = "ssh " + hostname + " 'mkdir -p " + dataDir + "; echo " + zkServerId + " > " + dataDir + "/myid'"
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+        for line in subproc.stdout.readlines():
+            pass    # dummy loop to wait until producer is completed
+
+        time.sleep(2)
+        start_entity_in_background(systemTestEnv, testcaseEnv, zkEntityId)
 
 def start_brokers(systemTestEnv, testcaseEnv):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
-    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
-                             clusterEntityConfigDictList, "role", "broker", "entity_id")
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+        clusterEntityConfigDictList, "role", "broker", "entity_id")
+
+    for brokerEntityId in brokerEntityIdList:
+        start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
+
+
+def start_mirror_makers(systemTestEnv, testcaseEnv):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+        clusterEntityConfigDictList, "role", "mirror_maker", "entity_id")
 
     for brokerEntityId in brokerEntityIdList:
         start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
 
 
-def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv):
+def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict):
 
     logger.info("looking up broker shutdown...", extra=d)
 
@@ -344,22 +479,19 @@ def get_broker_shutdown_log_line(systemT
     shutdownBrokerDict = {} 
 
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
                              clusterEntityConfigDictList, "role", "broker", "entity_id")
 
     for brokerEntityId in brokerEntityIdList:
 
-        hostname   = system_test_utils.get_data_by_lookup_keyval( \
+        hostname   = system_test_utils.get_data_by_lookup_keyval( 
                          clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
-        logFile    = system_test_utils.get_data_by_lookup_keyval( \
+        logFile    = system_test_utils.get_data_by_lookup_keyval( 
                          testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
 
-        shutdownBrokerDict["entity_id"] = brokerEntityId
-        shutdownBrokerDict["hostname"]  = hostname
-
         logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
         cmdStrList = ["ssh " + hostname,
-                      "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ",
+                      "\"grep -i -h '" + leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ",
                       logPathName + "/" + logFile + " | ",
                       "sort | tail -1\""]
         cmdStr     = " ".join(cmdStrList)
@@ -370,30 +502,35 @@ def get_broker_shutdown_log_line(systemT
 
             line = line.rstrip('\n')
 
-            if testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line:
+            if leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line:
                 logger.debug("found the log line : " + line, extra=d)
                 try:
-                    matchObj    = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line)
+                    matchObj    = re.match(leaderAttributesDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line)
                     datetimeStr = matchObj.group(1)
                     datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
                     unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
                     #print "{0:.3f}".format(unixTs)
-                    shutdownBrokerDict["timestamp"] = unixTs
-                    shutdownBrokerDict["brokerid"]  = matchObj.group(2)
-                    logger.debug("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d)
-                    return shutdownBrokerDict
+
+                    # update shutdownBrokerDict when
+                    # 1. shutdownBrokerDict has no logline entry
+                    # 2. shutdownBrokerDict has existing logline enty but found another logline with more recent timestamp
+                    if (len(shutdownBrokerDict) > 0 and shutdownBrokerDict["timestamp"] < unixTs) or (len(shutdownBrokerDict) == 0):
+                        shutdownBrokerDict["timestamp"] = unixTs
+                        shutdownBrokerDict["brokerid"]  = matchObj.group(2)
+                        shutdownBrokerDict["hostname"]  = hostname
+                        shutdownBrokerDict["entity_id"] = brokerEntityId
+                    logger.debug("brokerid: [" + shutdownBrokerDict["brokerid"] + \
+                        "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d)
                 except:
                     logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
                     raise
-            #else:
-            #    logger.debug("unmatched line found [" + line + "]", extra=d)
 
     return shutdownBrokerDict
 
 
-def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
+def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict):
 
-    logger.info("looking up leader...", extra=d)
+    logger.debug("looking up leader...", extra=d)
 
     # keep track of leader related data in this dict such as broker id,
     # entity id and timestamp and return it to the caller function
@@ -412,7 +549,7 @@ def get_leader_elected_log_line(systemTe
 
         logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
         cmdStrList = ["ssh " + hostname,
-                      "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
+                      "\"grep -i -h '" + leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
                       logPathName + "/" + logFile + " | ",
                       "sort | tail -1\""]
         cmdStr     = " ".join(cmdStrList)
@@ -423,10 +560,10 @@ def get_leader_elected_log_line(systemTe
 
             line = line.rstrip('\n')
 
-            if testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] in line:
+            if leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] in line:
                 logger.debug("found the log line : " + line, extra=d)
                 try:
-                    matchObj    = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"], line)
+                    matchObj    = re.match(leaderAttributesDict["REGX_LEADER_ELECTION_PATTERN"], line)
                     datetimeStr = matchObj.group(1)
                     datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
                     unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
@@ -462,6 +599,7 @@ def start_entity_in_background(systemTes
     kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
     javaHome  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "java_home")
     jmxPort   = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
+    clusterName = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "cluster_name")
 
     # testcase configurations:
     testcaseConfigsList = testcaseEnv.testcaseConfigsList
@@ -469,6 +607,11 @@ def start_entity_in_background(systemTes
     configFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "config_filename")
     logFile    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
 
+    mmConsumerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
+                           "mirror_consumer_config_filename")
+    mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
+                           "mirror_producer_config_filename")
+
     logger.info("starting " + role + " in host [" + hostname + "] on client port [" + clientPort + "]", extra=d)
 
     configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config")
@@ -483,13 +626,6 @@ def start_entity_in_background(systemTes
                   logPathName + "/" + logFile + " & echo pid:$! > ",
                   logPathName + "/entity_" + entityId + "_pid'"]
 
-        # construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr
-        if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ):
-            testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \
-                testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] + "," + hostname + ":" + clientPort
-        else:
-            testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = hostname + ":" + clientPort
-
     elif role == "broker":
         cmdList = ["ssh " + hostname,
                   "'JAVA_HOME=" + javaHome,
@@ -499,6 +635,17 @@ def start_entity_in_background(systemTes
                   logPathName + "/" + logFile + " & echo pid:$! > ",
                   logPathName + "/entity_" + entityId + "_pid'"]
 
+    elif role == "mirror_maker":
+        cmdList = ["ssh " + hostname,
+                  "'JAVA_HOME=" + javaHome,
+                 "JMX_PORT=" + jmxPort,
+                  kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker",
+                  "--consumer.config " + configPathName + "/" + mmConsumerConfigFile,
+                  "--producer.config " + configPathName + "/" + mmProducerConfigFile,
+                  "--whitelist=\".*\" >> ",
+                  logPathName + "/" + logFile + " & echo pid:$! > ",
+                  logPathName + "/entity_" + entityId + "_pid'"]
+
     cmdStr = " ".join(cmdList)
 
     logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -515,31 +662,35 @@ def start_entity_in_background(systemTes
             line = line.rstrip('\n')
             logger.debug("found pid line: [" + line + "]", extra=d)
             tokens = line.split(':')
-            testcaseEnv.entityParentPidDict[entityId] = tokens[1]
-            #print "\n#### testcaseEnv.entityParentPidDict ", testcaseEnv.entityParentPidDict, "\n"
+            if role == "zookeeper":
+                testcaseEnv.entityZkParentPidDict[entityId] = tokens[1]
+            elif role == "broker":
+                testcaseEnv.entityBrokerParentPidDict[entityId] = tokens[1]
+            elif role == "mirror_maker":
+                testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1]
 
     time.sleep(1)
-    metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
+    if role != "mirror_maker":
+        metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 
 def start_console_consumer(systemTestEnv, testcaseEnv):
 
-    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    clusterList = systemTestEnv.clusterEntityConfigDictList
 
-    consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
+    consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer")
     for consumerConfig in consumerConfigList:
         host              = consumerConfig["hostname"]
         entityId          = consumerConfig["entity_id"]
         jmxPort           = consumerConfig["jmx_port"] 
-        role              = consumerConfig["role"] 
-        kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
-                                clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
-        javaHome          = system_test_utils.get_data_by_lookup_keyval( \
-                                clusterEntityConfigDictList, "entity_id", entityId, "java_home")
-        jmxPort           = system_test_utils.get_data_by_lookup_keyval( \
-                                clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
+        role              = consumerConfig["role"]
+        clusterName       = consumerConfig["cluster_name"] 
+        kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home")
+        javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home")
+        jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port")
         kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
 
+
         logger.info("starting console consumer", extra=d)
 
         consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default")
@@ -547,12 +698,40 @@ def start_console_consumer(systemTestEnv
 
         testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName
 
-        commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["consumerConfigPathName"])
+        # testcase configurations:
+        testcaseList = testcaseEnv.testcaseConfigsList
+        topic     = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")
+        timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms")
+
+
+        formatterOption = ""
+        try:
+            formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter")
+        except:
+            pass
+
+        if len(formatterOption) > 0:
+            formatterOption = " --formatter " + formatterOption + " "
+
+        zkConnectStr = ""
+        if clusterName == "source":
+            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+        elif clusterName == "target":
+            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
+        else:
+            logger.error("Invalid cluster name : " + clusterName)
+            sys.exit(1)
+
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
-                   commandArgs + " >> " + consumerLogPathName,
+                   "--zookeeper " + zkConnectStr,
+                   "--topic " + topic,
+                   "--consumer-timeout-ms " + timeoutMs,
+                   formatterOption,
+                   "--from-beginning ",
+                   " >> " + consumerLogPathName,
                    " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]
 
         cmdStr = " ".join(cmdList)
@@ -574,7 +753,7 @@ def start_console_consumer(systemTestEnv
                 tokens = line.split(':')
                 testcaseEnv.consumerHostParentPidDict[host] = tokens[1]
 
-def start_producer_performance(systemTestEnv, testcaseEnv):
+def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client):
 
     entityConfigList     = systemTestEnv.clusterEntityConfigDictList
     testcaseConfigsList  = testcaseEnv.testcaseConfigsList
@@ -588,11 +767,6 @@ def start_producer_performance(systemTes
             entityId = entityConfig["entity_id"]
             port     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port")
 
-            if len(brokerListStr) == 0:
-                brokerListStr = hostname + ":" + port
-            else:
-                brokerListStr = brokerListStr + "," + hostname + ":" + port
-
     producerConfigList = system_test_utils.get_dict_from_list_of_dicts(entityConfigList, "role", "producer_performance")
     for producerConfig in producerConfigList:
         host              = producerConfig["hostname"]
@@ -600,41 +774,69 @@ def start_producer_performance(systemTes
         jmxPort           = producerConfig["jmx_port"] 
         role              = producerConfig["role"] 
 
-        thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, brokerListStr))
+        thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client))
+        testcaseEnv.lock.acquire()
+        testcaseEnv.numProducerThreadsRunning += 1
+        logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
+        time.sleep(1)
+        testcaseEnv.lock.release()
         time.sleep(1)
         metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
-def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, brokerListStr):
+def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client):
     host              = producerConfig["hostname"]
     entityId          = producerConfig["entity_id"]
     jmxPort           = producerConfig["jmx_port"] 
-    role              = producerConfig["role"] 
+    role              = producerConfig["role"]
+    clusterName       = producerConfig["cluster_name"]
     kafkaHome         = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "kafka_home")
     javaHome          = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "java_home")
     jmxPort           = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port")
     kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
 
+    # testcase configurations:
+    testcaseConfigsList = testcaseEnv.testcaseConfigsList
+    topic          = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic")
+    threads        = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads")
+    compCodec      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec")
+    messageSize    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size")
+    noMsgPerBatch  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message")
+    requestNumAcks = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "request-num-acks")
+    asyncMode      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "async")
+
+    brokerListStr  = ""
+    if clusterName == "source":
+        brokerListStr  = testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]
+    elif clusterName == "target":
+        brokerListStr  = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
+    else:
+        logger.error("Unknown cluster name: " + clusterName)
+        sys.exit(1)
+
     logger.info("starting producer preformance", extra=d)
 
     producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default")
     producerLogPathName = producerLogPath + "/producer_performance.log"
 
     testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
-    commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
 
     counter = 0
-    noMsgPerBatch    = int(testcaseEnv.testcaseArgumentsDict["num_messages_to_produce_per_producer_call"])
     producerSleepSec = int(testcaseEnv.testcaseArgumentsDict["sleep_seconds_between_producer_calls"])
 
-    # keep calling producer until signaled by:
+    boolArgumentsStr = ""
+    if asyncMode.lower() == "true":
+        boolArgumentsStr = boolArgumentsStr + " --async"
+
+    # keep calling producer until signaled to stop by:
     # testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]
     while 1:
+        logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
         testcaseEnv.lock.acquire()
         if not testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]:
-            initMsgId = counter * noMsgPerBatch
+            initMsgId = counter * int(noMsgPerBatch)
 
-            logger.info("#### [producer thread] status of stopBackgroundProducer : [False] => producing [" + str(noMsgPerBatch) + \
-                        "] messages with starting message id : [" + str(initMsgId) + "]", extra=d)
+            logger.info("#### [producer thread] status of stopBackgroundProducer : [False] => producing [" \
+                + str(noMsgPerBatch) + "] messages with starting message id : [" + str(initMsgId) + "]", extra=d)
 
             cmdList = ["ssh " + host,
                        "'JAVA_HOME=" + javaHome,
@@ -642,8 +844,43 @@ def start_producer_in_thread(testcaseEnv
                        kafkaRunClassBin + " kafka.perf.ProducerPerformance",
                        "--broker-list " + brokerListStr,
                        "--initial-message-id " + str(initMsgId),
-                       "--messages " + str(noMsgPerBatch),
-                       commandArgs + " >> " + producerLogPathName,
+                       "--messages " + noMsgPerBatch,
+                       "--topic " + topic,
+                       "--threads " + threads,
+                       "--compression-codec " + compCodec,
+                       "--message-size " + messageSize,
+                       "--request-num-acks " + requestNumAcks,
+                       boolArgumentsStr,
+                       " >> " + producerLogPathName,
+                       " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+
+            if kafka07Client:
+                cmdList[:] = []
+
+                brokerInfoStr = ""
+                tokenList = brokerListStr.split(',')
+                index = 1
+                for token in tokenList:
+                    if len(brokerInfoStr) == 0:
+                        brokerInfoStr = str(index) + ":" + token
+                    else:
+                        brokerInfoStr += "," + str(index) + ":" + token
+                    index += 1
+
+                brokerInfoStr = "broker.list=" + brokerInfoStr
+
+                cmdList = ["ssh " + host,
+                       "'JAVA_HOME=" + javaHome,
+                       "JMX_PORT=" + jmxPort,
+                       kafkaRunClassBin + " kafka.perf.ProducerPerformance",
+                       "--brokerinfo " + brokerInfoStr,
+                       "--messages " + noMsgPerBatch,
+                       "--topic " + topic,
+                       "--threads " + threads,
+                       "--compression-codec " + compCodec,
+                       "--message-size " + messageSize,
+                       "--vary-message-size --async",
+                       " >> " + producerLogPathName,
                        " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
 
             cmdStr = " ".join(cmdList)
@@ -653,20 +890,29 @@ def start_producer_in_thread(testcaseEnv
             for line in subproc.stdout.readlines():
                 pass    # dummy loop to wait until producer is completed
         else:
+            testcaseEnv.numProducerThreadsRunning -= 1
+            logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
             testcaseEnv.lock.release()
             break
 
         counter += 1
+        logger.debug("calling testcaseEnv.lock.release()", extra=d)
         testcaseEnv.lock.release()
         time.sleep(int(producerSleepSec))
 
-    # let the main testcase know producer has stopped
-    testcaseEnv.lock.acquire()
-    testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True
-    time.sleep(1)
-    testcaseEnv.lock.release()
-    time.sleep(1)
-
+    # wait until other producer threads also stops and
+    # let the main testcase know all producers have stopped
+    while 1:
+        testcaseEnv.lock.acquire()
+        time.sleep(1)
+        if testcaseEnv.numProducerThreadsRunning == 0:
+            testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True
+            testcaseEnv.lock.release()
+            break
+        else:
+            logger.debug("waiting for TRUE of testcaseEnv.userDefinedEnvVarDict['backgroundProducerStopped']", extra=d)
+            testcaseEnv.lock.release()
+        time.sleep(1)
 
 def stop_remote_entity(systemTestEnv, entityId, parentPid):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -676,8 +922,6 @@ def stop_remote_entity(systemTestEnv, en
 
     logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
     system_test_utils.sigterm_remote_process(hostname, pidStack)
-#    time.sleep(1)
-#    system_test_utils.sigkill_remote_process(hostname, pidStack)
 
 
 def force_stop_remote_entity(systemTestEnv, entityId, parentPid):
@@ -694,33 +938,47 @@ def create_topic(systemTestEnv, testcase
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
     prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
-    prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
-    prodTopicList   = prodPerfCfgDict[0]["topic"].split(',')
 
-    zkEntityId      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
-    zkHost          = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
-    kafkaHome       = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
-    javaHome        = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
-    createTopicBin  = kafkaHome + "/bin/kafka-create-topic.sh"
-
-    logger.info("zkEntityId : " + zkEntityId, extra=d)
-    logger.info("createTopicBin : " + createTopicBin, extra=d)
-
-    for topic in prodTopicList:
-        logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] + "]", extra=d) 
-        cmdList = ["ssh " + zkHost,
-                   "'JAVA_HOME=" + javaHome,
-                   createTopicBin,
-                   " --topic "     + topic,
-                   " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["zkConnectStr"],
-                   " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
-                   " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " &> ",
-                   testcaseEnv.testCaseBaseDir + "/logs/create_topic.log'"]
+    for prodPerfCfg in prodPerfCfgList:
+        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfg["entity_id"], "topic")
+        zkEntityId      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
+        zkHost          = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
+        kafkaHome       = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
+        javaHome        = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
+        createTopicBin  = kafkaHome + "/bin/kafka-create-topic.sh"
+
+        logger.debug("zkEntityId : " + zkEntityId, extra=d)
+        logger.debug("createTopicBin : " + createTopicBin, extra=d)
+
+        if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0:
+            logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] + "]", extra=d) 
+            cmdList = ["ssh " + zkHost,
+                       "'JAVA_HOME=" + javaHome,
+                       createTopicBin,
+                       " --topic "     + topic,
+                       " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+                       " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
+                       " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
+                       testcaseEnv.testCaseBaseDir + "/logs/create_source_cluster_topic.log'"]
 
-        cmdStr = " ".join(cmdList)
-        logger.debug("executing command: [" + cmdStr + "]", extra=d)
-        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+            cmdStr = " ".join(cmdList)
+            logger.debug("executing command: [" + cmdStr + "]", extra=d)
+            subproc = system_test_utils.sys_call_return_subproc(cmdStr)
 
+        if len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0:
+            logger.info("creating topic: [" + topic + "] at: [" + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"] + "]", extra=d) 
+            cmdList = ["ssh " + zkHost,
+                       "'JAVA_HOME=" + javaHome,
+                       createTopicBin,
+                       " --topic "     + topic,
+                       " --zookeeper " + testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"],
+                       " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
+                       " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ",
+                       testcaseEnv.testCaseBaseDir + "/logs/create_target_cluster_topic.log'"]
+
+            cmdStr = " ".join(cmdList)
+            logger.debug("executing command: [" + cmdStr + "]", extra=d)
+            subproc = system_test_utils.sys_call_return_subproc(cmdStr)
 
 def get_message_id(logPathName):
     logLines      = open(logPathName, "r").readlines()
@@ -735,44 +993,80 @@ def get_message_id(logPathName):
 
     return messageIdList
 
+def get_message_checksum(logPathName):
+    logLines = open(logPathName, "r").readlines()
+    messageChecksumList = []
+
+    for line in logLines:
+        if not "checksum:" in line:
+            continue
+        else:
+            matchObj = re.match('.*checksum:(\d*?).*', line)
+            if matchObj is not None:
+                messageChecksumList.append( matchObj.group(1) )
+            else:
+                logger.error("unexpected log line : " + line, extra=d)
+
+    return messageChecksumList
+
 
 def validate_data_matched(systemTestEnv, testcaseEnv):
     validationStatusDict        = testcaseEnv.validationStatusDict
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
-    producerEntityId = system_test_utils.get_data_by_lookup_keyval( \
-                           clusterEntityConfigDictList, "role", "producer_performance", "entity_id")
-    consumerEntityId = system_test_utils.get_data_by_lookup_keyval( \
+    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
+    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
+
+    for prodPerfCfg in prodPerfCfgList:
+        producerEntityId = prodPerfCfg["entity_id"]
+        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+
+        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
                            clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
-    msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \
-                           testcaseEnv, "console_consumer", consumerEntityId, "default") + \
-                           "/msg_id_missing_in_consumer.log"
-
-    producerMsgIdList  = get_message_id(testcaseEnv.userDefinedEnvVarDict["producerLogPathName"])
-    consumerMsgIdList  = get_message_id(testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"])
-    producerMsgIdSet   = set(producerMsgIdList)
-    consumerMsgIdSet   = set(consumerMsgIdList)
-
-    missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
-
-    outfile = open(msgIdMissingInConsumerLogPathName, "w")
-    for id in missingMsgIdInConsumer:
-        outfile.write(id + "\n")
-    outfile.close()
 
-    logger.info("no. of unique messages sent from publisher  : " + str(len(producerMsgIdSet)), extra=d)
-    logger.info("no. of unique messages received by consumer : " + str(len(consumerMsgIdSet)), extra=d)
-    validationStatusDict["Unique messages from producer"] = str(len(producerMsgIdSet))
-    validationStatusDict["Unique messages from consumer"] = str(len(consumerMsgIdSet))
-
-    if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
-        validationStatusDict["Validate for data matched"] = "PASSED"
-        return True
-    else:
-        validationStatusDict["Validate for data matched"] = "FAILED"
-        logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
-        return False
-     
+        matchingConsumerEntityId = None
+        for consumerEntityId in consumerEntityIdList:
+            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+            if consumerTopic in topic:
+                matchingConsumerEntityId = consumerEntityId
+                break
+
+        if matchingConsumerEntityId is None:
+            break
+
+        msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \
+                           testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") + "/msg_id_missing_in_consumer.log"
+        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
+        producerLogPathName = producerLogPath + "/producer_performance.log"
+
+        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
+        consumerLogPathName = consumerLogPath + "/console_consumer.log"
+
+        producerMsgIdList  = get_message_id(producerLogPathName)
+        consumerMsgIdList  = get_message_id(consumerLogPathName)
+        producerMsgIdSet   = set(producerMsgIdList)
+        consumerMsgIdSet   = set(consumerMsgIdList)
+
+        missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+
+        outfile = open(msgIdMissingInConsumerLogPathName, "w")
+        for id in missingMsgIdInConsumer:
+            outfile.write(id + "\n")
+        outfile.close()
+
+        logger.info("no. of unique messages on topic [" + topic + "] sent from publisher  : " + str(len(producerMsgIdSet)), extra=d)
+        logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgIdSet)), extra=d)
+        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
+        validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))
+
+        if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
+            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
+            #return True
+        else:
+            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
+            logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID", extra=d)
+            #return False
+
 
 def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict):
 
@@ -780,7 +1074,7 @@ def validate_leader_election_successful(
         try:
             leaderBrokerId = leaderDict["brokerid"]
             leaderEntityId = leaderDict["entity_id"]
-            leaderPid      = testcaseEnv.entityParentPidDict[leaderEntityId]
+            leaderPid      = testcaseEnv.entityBrokerParentPidDict[leaderEntityId]
             hostname       = leaderDict["hostname"]
 
             logger.info("found leader in entity [" + leaderEntityId + "] with brokerid [" + \
@@ -888,8 +1182,7 @@ def ps_grep_terminate_running_entity(sys
 
         system_test_utils.sys_call(cmdStr) 
 
-
-def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict):
+def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttributesDict):
     leaderEntityId = None
     leaderBrokerId = None
     leaderPPid     = None
@@ -897,14 +1190,13 @@ def get_reelection_latency(systemTestEnv
 
     if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
         # leader election is not successful - something is wrong => so skip this testcase
-        #continue
         return None
     else:
         # leader elected => stop leader
         try:
             leaderEntityId = leaderDict["entity_id"]
             leaderBrokerId = leaderDict["brokerid"]
-            leaderPPid     = testcaseEnv.entityParentPidDict[leaderEntityId]
+            leaderPPid     = testcaseEnv.entityBrokerParentPidDict[leaderEntityId]
         except:
             logger.info("leader details unavailable", extra=d)
             raise
@@ -912,22 +1204,260 @@ def get_reelection_latency(systemTestEnv
         logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid, extra=d)
         stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid)
 
-    logger.info("sleeping for 5s for leader re-election to complete", extra=d)
-    time.sleep(5)
+    logger.info("sleeping for 10s for leader re-election to complete", extra=d)
+    time.sleep(10)
 
     # get broker shut down completed timestamp
-    shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv)
-    #print shutdownBrokerDict
+    shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict)
     logger.debug("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])), extra=d)
 
-    logger.debug("looking up new leader", extra=d)
-    leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv)
-    #print leaderDict2
+    logger.info("looking up new leader", extra=d)
+
+    leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict)
     logger.debug("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d)
+
     leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"])
     logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
-    #testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency * 1000)) + " ms"
  
     return leaderReElectionLatency
 
+def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv):
+
+    anonLogger.info("================================================")
+    anonLogger.info("validating broker log segment checksums")
+    anonLogger.info("================================================")
+
+    # brokerLogCksumDict -
+    #   a dictionary to keep track of log segment files in each brokers and will look like this:
+    #
+    # {u'broker-1': {'test_1-0/00000000000000000000.kafka': '91500855',
+    #                'test_1-0/00000000000000010255.kafka': '1906285795',
+    #                'test_1-1/00000000000000000000.kafka': '3785861722',
+    #                'test_1-1/00000000000000010322.kafka': '1731628308'},
+    #  u'broker-2': {'test_1-0/00000000000000000000.kafka': '91500855',
+    #                'test_1-0/00000000000000010255.kafka': '1906285795',
+    #                'test_1-1/00000000000000000000.kafka': '3785861722',
+    #                'test_1-1/00000000000000010322.kafka': '1731628308'},
+    #  u'broker-3': {'test_1-0/00000000000000000000.kafka': '91500855',
+    #                'test_1-0/00000000000000010255.kafka': '1906285795',
+    #                'test_1-1/00000000000000000000.kafka': '3431356313'}}
+    brokerLogCksumDict = {}
+
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "broker", "entity_id")
+
+    # access all brokers' hosts to get broker id and its corresponding log
+    # segment file checksums and populate brokerLogCksumDict
+    for brokerEntityId in brokerEntityIdList:
+        logCksumDict       = {}
+
+        hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
+        logDir   = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log.dir")
+
+        # get the log segment file full path name
+        cmdStr   = "ssh " + hostname + " \"find " + logDir + " -name '*.log'\" 2> /dev/null"
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        subproc  = system_test_utils.sys_call_return_subproc(cmdStr)
+        for line in subproc.stdout.readlines():
+            # Need a key to identify each corresponding log segment file in different brokers:
+            #   This can be achieved by using part of the full log segment file path as a key to identify
+            #   the individual log segment file checksums. The key can be extracted from the path name
+            #   and starting from "topic-partition" such as:
+            #     full log segment path name   : /tmp/kafka_server_1_logs/test_1-0/00000000000000010255.kafka
+            #     part of the path name as key : test_1-0/00000000000000010255.kafka
+            logSegmentPathName = line.rstrip('\n')
+            substrIndex        = logSegmentPathName.index(logDir) + len(logDir + "/")
+            logSegmentFile     = logSegmentPathName[substrIndex:]
+
+            # get log segment file checksum
+            cksumCmdStr = "ssh " + hostname + " \"cksum " + logSegmentPathName + " | cut -f1 -d ' '\" 2> /dev/null"
+            subproc2 = system_test_utils.sys_call_return_subproc(cksumCmdStr)
+            for line2 in subproc2.stdout.readlines():
+                checksum = line2.rstrip('\n')
+                # use logSegmentFile as a key to associate with its checksum
+                logCksumDict[logSegmentFile] = checksum
+
+        # associate this logCksumDict with its broker id
+        brokerLogCksumDict["broker-"+brokerEntityId] = logCksumDict
+
+    # use a list of sets for checksums comparison
+    sets = []
+    for brokerId, logCksumDict in brokerLogCksumDict.items():
+        sets.append( set(logCksumDict.items()) )
+
+    # looping through the sets and compare between broker[n] & broker[n+1] ...
+    idx = 0
+    diffItemSet = None
+    while idx < len(sets) - 1:
+        diffItemSet = sets[idx] ^ sets[idx + 1]
+
+        if (len(diffItemSet) > 0):
+            logger.error("Mismatch found : " + str(diffItemSet), extra=d)
+            testcaseEnv.validationStatusDict["Log segment checksum matching across all replicas"] = "FAILED"
+
+            # get the mismatched items key, i.e. the log segment file name
+            diffItemList = list(diffItemSet)
+            diffItemKeys = []
+            for keyvalSet in diffItemList:
+                keyvalList = list(keyvalSet)
+                diffItemKeys.append(keyvalList[0])
+
+            # mismatch found - so print out the whole log segment file checksum
+            # info with the mismatched checksum highlighted
+            for brokerId in sorted(brokerLogCksumDict.iterkeys()):
+                logCksumDict = brokerLogCksumDict[brokerId]
+                print brokerId,":"
+                for logSegmentFile in sorted(logCksumDict.iterkeys()):
+                    checksum = logCksumDict[logSegmentFile]
+                    sys.stdout.write(logSegmentFile + " => " + checksum)
+                    try:
+                        if diffItemKeys.index(logSegmentFile) >= 0:
+                            sys.stdout.write("    <<<< not matching across all replicas")
+                    except:
+                        pass
+                    print
+                print
+            return
+        idx += 1
+
+    # getting here means all log segment checksums matched
+    testcaseEnv.validationStatusDict["Log segment checksum matching across all replicas"] = "PASSED"
+
+    anonLogger.info("log segment files checksum :")
+    print
+    pprint.pprint(brokerLogCksumDict)
+    print
+
+def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
+
+    entityConfigs = systemTestEnv.clusterEntityConfigDictList
+
+    for hostname, producerPPid in testcaseEnv.producerHostParentPidDict.items():
+        producerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")
+        stop_remote_entity(systemTestEnv, producerEntityId, producerPPid)
+
+    for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items():
+        consumerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")
+        stop_remote_entity(systemTestEnv, consumerEntityId, consumerPPid)
+
+    for entityId, jmxParentPidList in testcaseEnv.entityJmxParentPidDict.items():
+        for jmxParentPid in jmxParentPidList:
+            stop_remote_entity(systemTestEnv, entityId, jmxParentPid)
+
+    for entityId, mirrorMakerParentPid in testcaseEnv.entityMirrorMakerParentPidDict.items():
+        stop_remote_entity(systemTestEnv, entityId, mirrorMakerParentPid)
+
+    for entityId, brokerParentPid in testcaseEnv.entityBrokerParentPidDict.items():
+        stop_remote_entity(systemTestEnv, entityId, brokerParentPid)
+
+    for entityId, zkParentPid in testcaseEnv.entityZkParentPidDict.items():
+        stop_remote_entity(systemTestEnv, entityId, zkParentPid)
+
+
+def start_migration_tool(systemTestEnv, testcaseEnv):
+    clusterConfigList = systemTestEnv.clusterEntityConfigDictList
+    migrationToolConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "migration_tool")
+
+    migrationToolConfig = migrationToolConfigList[0]
+    host              = migrationToolConfig["hostname"]
+    entityId          = migrationToolConfig["entity_id"]
+    jmxPort           = migrationToolConfig["jmx_port"] 
+    role              = migrationToolConfig["role"] 
+    kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home")
+    javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home")
+    jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port")
+    kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
+
+    logger.info("starting kafka migration tool", extra=d)
+    migrationToolLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "migration_tool", entityId, "default")
+    migrationToolLogPathName = migrationToolLogPath + "/migration_tool.log"
+    testcaseEnv.userDefinedEnvVarDict["migrationToolLogPathName"] = migrationToolLogPathName
+
+    testcaseConfigsList = testcaseEnv.testcaseConfigsList
+    numProducers    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.producers")
+    numStreams      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.streams")
+    producerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer.config")
+    consumerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "consumer.config")
+    zkClientJar     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "zkclient.01.jar")
+    kafka07Jar      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "kafka.07.jar")
+    whiteList       = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "whitelist")
+    logFile         = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
+
+    cmdList = ["ssh " + host,
+               "'JAVA_HOME=" + javaHome,
+               "JMX_PORT=" + jmxPort,
+               kafkaRunClassBin + " kafka.tools.KafkaMigrationTool",
+               "--whitelist="        + whiteList,
+               "--num.producers="    + numProducers,
+               "--num.streams="      + numStreams,
+               "--producer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + producerConfig,
+               "--consumer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + consumerConfig,
+               "--zkclient.01.jar="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + zkClientJar,
+               "--kafka.07.jar="     + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + kafka07Jar,
+               " &> " + migrationToolLogPath + "/migrationTool.log",
+               " & echo pid:$! > " + migrationToolLogPath + "/entity_" + entityId + "_pid'"]
+
+    cmdStr = " ".join(cmdList)
+    logger.debug("executing command: [" + cmdStr + "]", extra=d)
+    system_test_utils.async_sys_call(cmdStr)
+
+def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
+    validationStatusDict        = testcaseEnv.validationStatusDict
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
+    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer")
+
+    for prodPerfCfg in prodPerfCfgList:
+        producerEntityId = prodPerfCfg["entity_id"]
+        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+
+        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
+                               clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
+
+        matchingConsumerEntityId = None
+        for consumerEntityId in consumerEntityIdList:
+            consumerTopic = system_test_utils.get_data_by_lookup_keyval(
+                            testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+            if consumerTopic in topic:
+                matchingConsumerEntityId = consumerEntityId
+                break
+
+        if matchingConsumerEntityId is None:
+            break
+
+        msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( 
+                                                  testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \
+                                                  + "/msg_checksum_missing_in_consumer.log"
+        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default")
+        producerLogPathName = producerLogPath + "/producer_performance.log"
+
+        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
+        consumerLogPathName = consumerLogPath + "/console_consumer.log"
+
+        producerMsgChecksumList   = get_message_checksum(producerLogPathName)
+        consumerMsgChecksumList   = get_message_checksum(consumerLogPathName)
+        producerMsgChecksumSet    = set(producerMsgChecksumList)
+        consumerMsgChecksumSet    = set(consumerMsgChecksumList)
+
+        missingMsgChecksumInConsumer = producerMsgChecksumSet - consumerMsgChecksumSet
+
+        outfile = open(msgChecksumMissingInConsumerLogPathName, "w")
+        for id in missingMsgChecksumInConsumer:
+            outfile.write(id + "\n")
+        outfile.close()
+
+        logger.info("no. of unique messages on topic [" + topic + "] sent from publisher  : " + str(len(producerMsgChecksumList)), extra=d)
+        logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgChecksumList)), extra=d)
+        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgChecksumList))
+        validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgChecksumList))
+
+        if ( len(missingMsgChecksumInConsumer) == 0 and len(producerMsgChecksumList) > 0 ):
+            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
+            #return True
+        else:
+            validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
+            logger.info("See " + msgChecksumMissingInConsumerLogPathName + " for missing MessageID", extra=d)
+            #return False
+
 

Modified: incubator/kafka/branches/0.8/system_test/utils/metrics.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/metrics.py?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/metrics.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/metrics.py Wed Oct 10 16:56:57 2012
@@ -239,7 +239,7 @@ def start_metrics_collection(jmxHost, jm
         system_test_utils.async_sys_call(startMetricsCommand)
         time.sleep(1)
 
-        pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid 2> /dev/null'"
+        pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid' 2> /dev/null"
         logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
         subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
 

Added: incubator/kafka/branches/0.8/system_test/utils/replication_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/replication_utils.py?rev=1396687&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/replication_utils.py (added)
+++ incubator/kafka/branches/0.8/system_test/utils/replication_utils.py Wed Oct 10 16:56:57 2012
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# =================================================================
+# replication_utils.py
+# - This module defines constant values specific to Kafka Replication
+#   and also provides helper functions for Replication system test.
+# =================================================================
+
+import logging
+import sys
+
+class ReplicationUtils(object):
+
+    thisClassName = '(ReplicationUtils)'
+    d = {'name_of_class': thisClassName}
+
+    logger     = logging.getLogger("namedLogger")
+    anonLogger = logging.getLogger("anonymousLogger")
+
+    def __init__(self, testClassInstance):
+        super(ReplicationUtils, self).__init__()
+        self.logger.debug("#### constructor inside ReplicationUtils", extra=self.d)
+
+        # leader attributes
+        self.isLeaderLogPattern             = "Completed the leader state transition"
+        self.brokerShutDownCompletedPattern = "shut down completed"
+
+        self.leaderAttributesDict = {}
+
+        self.leaderAttributesDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = \
+            self.brokerShutDownCompletedPattern
+
+        self.leaderAttributesDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \
+            "\[(.*?)\] .* \[Kafka Server (.*?)\], " + \
+            self.brokerShutDownCompletedPattern
+
+        self.leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] = \
+            self.isLeaderLogPattern
+
+        self.leaderAttributesDict["REGX_LEADER_ELECTION_PATTERN"]  = \
+            "\[(.*?)\] .* Broker (.*?): " + \
+            self.leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + \
+            " for topic (.*?) partition (.*?) \(.*"
+

Modified: incubator/kafka/branches/0.8/system_test/utils/setup_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/setup_utils.py?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/setup_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/setup_utils.py Wed Oct 10 16:56:57 2012
@@ -16,11 +16,16 @@
 # under the License.
 #!/usr/bin/env python
 
+# =================================================================
+# setup_utils.py
+# - This module provides some basic helper functions.
+# =================================================================
+
 import logging
 import kafka_system_test_utils
 import sys
 
-class SetupUtils():
+class SetupUtils(object):
 
     # dict to pass user-defined attributes to logger argument: "extra"
     # to use: just update "thisClassName" to the appropriate value
@@ -32,8 +37,7 @@ class SetupUtils():
 
     def __init__(self):
         d = {'name_of_class': self.__class__.__name__}
-        self.logger.info("constructor", extra=SetUpUtils.d)
-
+        self.logger.debug("#### constructor inside SetupUtils", extra=self.d)
 
     def log_message(self, message):
         print

Modified: incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py Wed Oct 10 16:56:57 2012
@@ -20,6 +20,7 @@
 # system_test_utils.py
 # ===================================
 
+import copy
 import inspect
 import json
 import logging
@@ -31,7 +32,8 @@ import subprocess
 import sys
 import time
 
-logger = logging.getLogger("namedLogger")
+logger  = logging.getLogger("namedLogger")
+aLogger = logging.getLogger("anonymousLogger")
 thisClassName = '(system_test_utils)'
 d = {'name_of_class': thisClassName}
 
@@ -319,6 +321,14 @@ def remote_host_processes_stopped(hostna
 
 
 def setup_remote_hosts(systemTestEnv):
+    # sanity check on remote hosts to make sure:
+    # - all directories (eg. java_home) specified in cluster_config.json exists in all hosts
+    # - no conflicting running processes in remote hosts
+
+    aLogger.info("=================================================")
+    aLogger.info("setting up remote hosts ...")
+    aLogger.info("=================================================")
+
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
     localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
@@ -353,19 +363,19 @@ def setup_remote_hosts(systemTestEnv):
         kafkaHome = clusterEntityConfigDict["kafka_home"]
         javaHome  = clusterEntityConfigDict["java_home"]
 
-        logger.info("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d)
+        logger.debug("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d)
         if not remote_host_directory_exists(hostname, javaHome):
             logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d)
             return False
 
-        logger.info("checking directory [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
+        logger.debug("checking directory [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
         if not remote_host_directory_exists(hostname, kafkaHome):
             logger.info("Directory not found: [" + kafkaHome + "] in host [" + hostname + "]", extra=d)
             if hostname == "localhost":
                 return False
             else:
                 localKafkaSourcePath = systemTestEnv.SYSTEM_TEST_BASE_DIR + "/.."
-                logger.info("copying local copy of [" + localKafkaSourcePath + "] to " + hostname + ":" + kafkaHome, extra=d)
+                logger.debug("copying local copy of [" + localKafkaSourcePath + "] to " + hostname + ":" + kafkaHome, extra=d)
                 copy_source_to_remote_hosts(hostname, localKafkaSourcePath, kafkaHome)
 
     return True
@@ -385,16 +395,135 @@ def remove_kafka_home_dir_at_remote_host
     if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
         cmdStr  = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'"
         logger.info("executing command [" + cmdStr + "]", extra=d)
-        system_test_utils.sys_call(cmdStr)
+        sys_call(cmdStr)
 
         cmdStr  = "ssh " + hostname + " 'rm -rf " + kafkaHome + "'"
         logger.info("executing command [" + cmdStr + "]", extra=d)
-        #system_test_utils.sys_call(cmdStr)
+        #sys_call(cmdStr)
     else:
         logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
         logger.warn("check config file: system_test/cluster_config.properties", extra=d)
         logger.warn("aborting test...", extra=d)
         sys.exit(1)
 
+def get_md5_for_file(filePathName, blockSize=8192):
+    md5 = hashlib.md5()
+    f   = open(filePathName, 'rb')
+
+    while True:
+        data = f.read(blockSize)
+        if not data:
+            break
+        md5.update(data)
+    return md5.digest()
+
+def load_cluster_config(clusterConfigPathName, clusterEntityConfigDictList):
+    # empty the list
+    clusterEntityConfigDictList[:] = []
+
+    # retrieve each entity's data from cluster config json file
+    # as "dict" and enter them into a "list"
+    jsonFileContent = open(clusterConfigPathName, "r").read()
+    jsonData        = json.loads(jsonFileContent)
+    for key, cfgList in jsonData.items():
+        if key == "cluster_config":
+            for cfg in cfgList:
+                clusterEntityConfigDictList.append(cfg)
+
+def setup_remote_hosts_with_testcase_level_cluster_config(systemTestEnv, testCasePathName):
+    # =======================================================================
+    # starting a new testcase, check for local cluster_config.json
+    # =======================================================================
+    # 1. if there is a xxxx_testsuite/testcase_xxxx/cluster_config.json
+    #    => load it into systemTestEnv.clusterEntityConfigDictList
+    # 2. if there is NO testcase_xxxx/cluster_config.json but has a xxxx_testsuite/cluster_config.json
+    #    => retore systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite
+    # 3. if there is NO testcase_xxxx/cluster_config.json NOR xxxx_testsuite/cluster_config.json
+    #    => restore system_test/cluster_config.json
+
+    testCaseLevelClusterConfigPathName = testCasePathName + "/cluster_config.json"
+
+    if os.path.isfile(testCaseLevelClusterConfigPathName):
+        # if there is a cluster_config.json in this directory, load it and use it for this testsuite
+        logger.info("found a new cluster_config : " + testCaseLevelClusterConfigPathName, extra=d)
+
+        # empty the current cluster config list
+        systemTestEnv.clusterEntityConfigDictList[:] = []
+
+        # load the cluster config for this testcase level
+        load_cluster_config(testCaseLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList)
+
+        # back up this testcase level cluster config
+        systemTestEnv.clusterEntityConfigDictListLastFoundInTestCase = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList)
+
+    elif len(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite) > 0:
+        # if there is NO testcase_xxxx/cluster_config.json, but has a xxxx_testsuite/cluster_config.json
+        # => restore the config in xxxx_testsuite/cluster_config.json
+
+        # empty the current cluster config list
+        systemTestEnv.clusterEntityConfigDictList[:] = []
+
+        # restore the system_test/cluster_config.json
+        systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite)
+
+    else:
+        # if there is NONE, restore the config in system_test/cluster_config.json
+
+        # empty the current cluster config list
+        systemTestEnv.clusterEntityConfigDictList[:] = []
+
+        # restore the system_test/cluster_config.json
+        systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel)
+
+    # set up remote hosts
+    if not setup_remote_hosts(systemTestEnv):
+        logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
+        print
+        sys.exit(1)
+    print
+
+def setup_remote_hosts_with_testsuite_level_cluster_config(systemTestEnv, testModulePathName):
+    # =======================================================================
+    # starting a new testsuite, check for local cluster_config.json:
+    # =======================================================================
+    # 1. if there is a xxxx_testsuite/cluster_config.son
+    #    => load it into systemTestEnv.clusterEntityConfigDictList
+    # 2. if there is NO xxxx_testsuite/cluster_config.son
+    #    => restore system_test/cluster_config.json
+
+    testSuiteLevelClusterConfigPathName = testModulePathName + "/cluster_config.json"
+
+    if os.path.isfile(testSuiteLevelClusterConfigPathName):
+        # if there is a cluster_config.json in this directory, load it and use it for this testsuite
+        logger.info("found a new cluster_config : " + testSuiteLevelClusterConfigPathName, extra=d)
+
+        # empty the current cluster config list
+        systemTestEnv.clusterEntityConfigDictList[:] = []
+
+        # load the cluster config for this testsuite level
+        load_cluster_config(testSuiteLevelClusterConfigPathName, systemTestEnv.clusterEntityConfigDictList)
+
+        # back up this testsuite level cluster config
+        systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite = copy.deepcopy(systemTestEnv.clusterEntityConfigDictList)
+
+    else:
+        # if there is NONE, restore the config in system_test/cluster_config.json
+
+        # empty the last testsuite level cluster config list
+        systemTestEnv.clusterEntityConfigDictListLastFoundInTestSuite[:] = []
+
+        # empty the current cluster config list
+        systemTestEnv.clusterEntityConfigDictList[:] = []
+
+        # restore the system_test/cluster_config.json
+        systemTestEnv.clusterEntityConfigDictList = copy.deepcopy(systemTestEnv.clusterEntityConfigDictListInSystemTestLevel)
+
+    # set up remote hosts
+    if not setup_remote_hosts(systemTestEnv):
+        logger.error("Remote hosts sanity check failed. Aborting test ...", extra=d)
+        print
+        sys.exit(1)
+    print
+
 
 

Modified: incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/testcase_env.py?rev=1396687&r1=1396686&r2=1396687&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/testcase_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/testcase_env.py Wed Oct 10 16:56:57 2012
@@ -25,17 +25,31 @@ import os
 import sys
 import thread
 
+import system_test_utils
+
 class TestcaseEnv():
 
     # ================================
     # Generic testcase environment
     # ================================
 
-    # dictionary of entity_id to ppid for entities such as zookeepers & brokers
+    # dictionary of entity_id to ppid for Zookeeper entities
     # key: entity_id
-    # val: ppid of zk or broker associated to that entity_id
+    # val: ppid of Zookeeper associated to that entity_id
     # { 0: 12345, 1: 12389, ... }
-    entityParentPidDict = {}
+    entityZkParentPidDict = {}
+
+    # dictionary of entity_id to ppid for broker entities
+    # key: entity_id
+    # val: ppid of broker associated to that entity_id
+    # { 0: 12345, 1: 12389, ... }
+    entityBrokerParentPidDict = {}
+
+    # dictionary of entity_id to ppid for mirror-maker entities
+    # key: entity_id
+    # val: ppid of broker associated to that entity_id
+    # { 0: 12345, 1: 12389, ... }
+    entityMirrorMakerParentPidDict = {}
 
     # dictionary of entity_id to list of JMX ppid
     # key: entity_id
@@ -67,8 +81,8 @@ class TestcaseEnv():
 
         # gather the test case related info and add to an SystemTestEnv object
         self.testcaseResultsDict = {}
-        self.testcaseResultsDict["test_class_name"]    = classInstance.__class__.__name__
-        self.testcaseResultsDict["test_case_name"]     = ""
+        self.testcaseResultsDict["_test_class_name"] = classInstance.__class__.__name__
+        self.testcaseResultsDict["_test_case_name"]  = ""
         self.validationStatusDict                      = {}
         self.testcaseResultsDict["validation_status"]  = self.validationStatusDict
         self.systemTestEnv.systemTestResultsList.append(self.testcaseResultsDict)
@@ -84,6 +98,8 @@ class TestcaseEnv():
         self.testCaseBaseDir       = ""
         self.testCaseLogsDir       = ""
         self.testCaseDashboardsDir = ""
+        self.testcasePropJsonPathName = ""
+        self.testcaseNonEntityDataDict = {}
 
         # ================================
         # dictionary to keep track of
@@ -103,4 +119,39 @@ class TestcaseEnv():
         # Lock object for producer threads synchronization
         self.lock = thread.allocate_lock()
 
+        self.numProducerThreadsRunning = 0
+
+    def initWithKnownTestCasePathName(self, testCasePathName):
+        testcaseDirName = os.path.basename(testCasePathName)
+        self.testcaseResultsDict["_test_case_name"] = testcaseDirName
+        self.testCaseBaseDir = testCasePathName
+        self.testCaseLogsDir = self.testCaseBaseDir + "/logs"
+        self.testCaseDashboardsDir = self.testCaseBaseDir + "/dashboards"
+
+        # find testcase properties json file
+        self.testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName)
+
+        # get the dictionary that contains the testcase arguments and description
+        self.testcaseNonEntityDataDict = system_test_utils.get_json_dict_data(self.testcasePropJsonPathName)
+
+    def printTestCaseDescription(self, testcaseDirName):
+        testcaseDescription = ""
+        for k,v in self.testcaseNonEntityDataDict.items():
+            if ( k == "description" ):
+                testcaseDescription = v
+
+        print "\n"
+        print "======================================================================================="
+        print "Test Case Name :", testcaseDirName
+        print "======================================================================================="
+        print "Description    :"
+        for step in sorted(testcaseDescription.iterkeys()):
+            print "   ", step, ":", testcaseDescription[step]
+        print "======================================================================================="
+        print "Test Case Args :"
+        for k,v in self.testcaseArgumentsDict.items():
+            print "   ", k, " : ", v
+            self.testcaseResultsDict["arg : " + k] = v
+        print "======================================================================================="
+
 



Mime
View raw message