kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1418545 - in /kafka/branches/0.8/system_test: migration_tool_testsuite/migration_tool_test.py mirror_maker_testsuite/mirror_maker_test.py replication_testsuite/replica_basic_test.py utils/kafka_system_test_utils.py
Date Fri, 07 Dec 2012 23:07:17 GMT
Author: nehanarkhede
Date: Fri Dec  7 23:07:16 2012
New Revision: 1418545

URL: http://svn.apache.org/viewvc?rev=1418545&view=rev
Log:
KAFKA-644 https://svn.apache.org/repos/asf/kafka/branches/0.8; patched by John Fung; reviewed
by Neha Narkhede

Modified:
    kafka/branches/0.8/system_test/migration_tool_testsuite/migration_tool_test.py
    kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py
    kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py

Modified: kafka/branches/0.8/system_test/migration_tool_testsuite/migration_tool_test.py
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/migration_tool_testsuite/migration_tool_test.py?rev=1418545&r1=1418544&r2=1418545&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/migration_tool_testsuite/migration_tool_test.py (original)
+++ kafka/branches/0.8/system_test/migration_tool_testsuite/migration_tool_test.py Fri Dec
 7 23:07:16 2012
@@ -123,21 +123,21 @@ class MigrationToolTest(ReplicationUtils
                 # initialize signal handler
                 signal.signal(signal.SIGINT, self.signal_handler)
     
-                # create "LOCAL" log directories for metrics, dashboards for each entity
under this testcase
-                # for collecting logs from remote machines
-                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
-    
                 # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties
file:
                 #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
                 self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
                     self.testcaseEnv.testcasePropJsonPathName)
     
-                # TestcaseEnv - initialize producer & consumer config / log file pathnames
-                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
-
                 # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
                 kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv,
self.testcaseEnv)
 
+                # create "LOCAL" log directories for metrics, dashboards for each entity
under this testcase
+                # for collecting logs from remote machines
+                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+    
+                # TestcaseEnv - initialize producer & consumer config / log file pathnames
+                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+
                 # generate remote hosts log/config dirs if not exist
                 kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv,
self.testcaseEnv)
     

Modified: kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py?rev=1418545&r1=1418544&r2=1418545&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py (original)
+++ kafka/branches/0.8/system_test/mirror_maker_testsuite/mirror_maker_test.py Fri Dec  7
23:07:16 2012
@@ -123,22 +123,21 @@ class MirrorMakerTest(ReplicationUtils, 
                 # initialize signal handler
                 signal.signal(signal.SIGINT, self.signal_handler)
 
-                
-                # create "LOCAL" log directories for metrics, dashboards for each entity
under this testcase
-                # for collecting logs from remote machines
-                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
-    
                 # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties
file:
                 #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
                 self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
                     self.testcaseEnv.testcasePropJsonPathName)
-    
-                # TestcaseEnv - initialize producer & consumer config / log file pathnames
-                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
-
+                 
                 # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
                 kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv,
self.testcaseEnv)
 
+                # create "LOCAL" log directories for metrics, dashboards for each entity
under this testcase
+                # for collecting logs from remote machines
+                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+
+                # TestcaseEnv - initialize producer & consumer config / log file pathnames
+                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+
                 # generate remote hosts log/config dirs if not exist
                 kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv,
self.testcaseEnv)
     

Modified: kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1418545&r1=1418544&r2=1418545&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Fri Dec  7
23:07:16 2012
@@ -139,22 +139,21 @@ class ReplicaBasicTest(ReplicationUtils,
                 # initialize signal handler
                 signal.signal(signal.SIGINT, self.signal_handler)
     
-                # create "LOCAL" log directories for metrics, dashboards for each entity
under this testcase
-                # for collecting logs from remote machines
-                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
-    
                 # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties
file:
                 #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
                 self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
                     self.testcaseEnv.testcasePropJsonPathName)
-    
-                # TestcaseEnv - initialize producer & consumer config / log file pathnames
-                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
 
-                
                 # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
                 kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv,
self.testcaseEnv)
 
+                # create "LOCAL" log directories for metrics, dashboards for each entity
under this testcase
+                # for collecting logs from remote machines
+                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+   
+                # TestcaseEnv - initialize producer & consumer config / log file pathnames
+                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+
                 # generate remote hosts log/config dirs if not exist
                 kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv,
self.testcaseEnv)
     

Modified: kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1418545&r1=1418544&r2=1418545&view=diff
==============================================================================
--- kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Fri Dec  7 23:07:16 2012
@@ -131,6 +131,7 @@ def collect_logs_from_remote_hosts(syste
         hostname   = clusterEntityConfigDict["hostname"]
         entity_id  = clusterEntityConfigDict["entity_id"]
         role       = clusterEntityConfigDict["role"]
+        kafkaHome  = clusterEntityConfigDict["kafka_home"]
 
         logger.debug("entity_id : " + entity_id, extra=d)
         logger.debug("hostname  : " + hostname,  extra=d)
@@ -139,12 +140,19 @@ def collect_logs_from_remote_hosts(syste
         configPathName     = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id,
"config")
         metricsPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id,
"metrics")
         logPathName        = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id,
"default")
+        rmtLogPathName     = logPathName
+        rmtMetricsPathName = metricsPathName
+
+        if hostname != "localhost":
+            rmtConfigPathName  = replace_kafka_home(configPathName, kafkaHome)
+            rmtMetricsPathName = replace_kafka_home(metricsPathName, kafkaHome)
+            rmtLogPathName     = replace_kafka_home(logPathName, kafkaHome)
 
         # ==============================
         # collect entity log file
         # ==============================
         cmdList = ["scp",
-                   hostname + ":" + logPathName + "/*",
+                   hostname + ":" + rmtLogPathName + "/*",
                    logPathName]
         cmdStr  = " ".join(cmdList)
         logger.debug("executing command [" + cmdStr + "]", extra=d)
@@ -154,7 +162,7 @@ def collect_logs_from_remote_hosts(syste
         # collect entity metrics file
         # ==============================
         cmdList = ["scp",
-                   hostname + ":" + metricsPathName + "/*",
+                   hostname + ":" + rmtMetricsPathName + "/*",
                    metricsPathName]
         cmdStr  = " ".join(cmdList)
         logger.debug("executing command [" + cmdStr + "]", extra=d)
@@ -192,8 +200,13 @@ def collect_logs_from_remote_hosts(syste
     # collect dashboards file
     # ==============================
     dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id,
"dashboards")
+    rmtDashboardsPathName = dashboardsPathName
+
+    if hostname != "localhost":
+        rmtDashboardsPathName  = replace_kafka_home(dashboardsPathName, kafkaHome)
+
     cmdList = ["scp",
-               hostname + ":" + dashboardsPathName + "/*",
+               hostname + ":" + rmtDashboardsPathName + "/*",
                dashboardsPathName]
     cmdStr  = " ".join(cmdList)
     logger.debug("executing command [" + cmdStr + "]", extra=d)
@@ -207,6 +220,7 @@ def generate_testcase_log_dirs_in_remote
         hostname   = clusterEntityConfigDict["hostname"]
         entity_id  = clusterEntityConfigDict["entity_id"]
         role       = clusterEntityConfigDict["role"]
+        kafkaHome  = clusterEntityConfigDict["kafka_home"]
 
         logger.debug("entity_id : " + entity_id, extra=d)
         logger.debug("hostname  : " + hostname, extra=d)
@@ -216,6 +230,11 @@ def generate_testcase_log_dirs_in_remote
         metricsPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id,
"metrics")
         dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id,
"dashboards")
 
+        if hostname != "localhost":
+            configPathName     = replace_kafka_home(configPathName, kafkaHome)
+            metricsPathName    = replace_kafka_home(metricsPathName, kafkaHome)
+            dashboardsPathName = replace_kafka_home(dashboardsPathName, kafkaHome)
+
         cmdList = ["ssh " + hostname,
                    "'mkdir -p",
                    configPathName,
@@ -443,9 +462,14 @@ def scp_file_to_remote_host(clusterEntit
 
     for clusterEntityConfigDict in clusterEntityConfigDictList:
         hostname         = clusterEntityConfigDict["hostname"]
-        testcasePathName = testcaseEnv.testCaseBaseDir
+        kafkaHome        = clusterEntityConfigDict["kafka_home"]
+        localTestcasePathName  = testcaseEnv.testCaseBaseDir
+        remoteTestcasePathName = localTestcasePathName
+
+        if hostname != "localhost":
+            remoteTestcasePathName = replace_kafka_home(localTestcasePathName, kafkaHome)
 
-        cmdStr = "scp " + testcasePathName + "/config/* " + hostname + ":" + testcasePathName
+ "/config"
+        cmdStr = "scp " + localTestcasePathName + "/config/* " + hostname + ":" + remoteTestcasePathName
+ "/config"
         logger.debug("executing command [" + cmdStr + "]", extra=d)
         system_test_utils.sys_call(cmdStr)
 
@@ -586,10 +610,16 @@ def get_leader_elected_log_line(systemTe
 
         hostname   = system_test_utils.get_data_by_lookup_keyval( \
                          clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
+        kafkaHome  = system_test_utils.get_data_by_lookup_keyval( \
+                         clusterEntityConfigDictList, "entity_id", brokerEntityId, "kafka_home")
         logFile    = system_test_utils.get_data_by_lookup_keyval( \
                          testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
 
         logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId,
"default")
+
+        if hostname != "localhost":
+            logPathName = replace_kafka_home(logPathName, kafkaHome)
+
         cmdStrList = ["ssh " + hostname,
                       "\"grep -i -h '" + leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"]
+ "' ",
                       logPathName + "/" + logFile + " | ",
@@ -659,6 +689,10 @@ def start_entity_in_background(systemTes
     configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "config")
     logPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId, "default")
 
+    if hostname != "localhost":
+        configPathName = replace_kafka_home(configPathName, kafkaHome)
+        logPathName    = replace_kafka_home(logPathName, kafkaHome)
+
     if role == "zookeeper":
         cmdList = ["ssh " + hostname,
                   "'JAVA_HOME=" + javaHome,
@@ -728,10 +762,15 @@ def start_console_consumer(systemTestEnv
         jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id",
entityId, "jmx_port")
         kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
 
-
         logger.info("starting console consumer", extra=d)
 
-        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
entityId, "default")
+        consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
entityId, "default")
+        metricsDir      = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
entityId, "metrics"),
+
+        if host != "localhost":
+            consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome)
+            #metricsDir      = replace_kafka_home(metricsDir, kafkaHome)
+
         consumerLogPathName = consumerLogPath + "/console_consumer.log"
 
         testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName
@@ -741,7 +780,6 @@ def start_console_consumer(systemTestEnv
         topic     = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id",
entityId, "topic")
         timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id",
entityId, "consumer-timeout-ms")
 
-
         formatterOption = ""
         try:
             formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id",
entityId, "formatter")
@@ -768,7 +806,7 @@ def start_console_consumer(systemTestEnv
                    "--topic " + topic,
                    "--consumer-timeout-ms " + timeoutMs,
                    "--csv-reporter-enabled",
-                   "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
entityId, "metrics"),
+                   #"--metrics-dir " + metricsDir,
                    formatterOption,
                    "--from-beginning ",
                    " >> " + consumerLogPathName,
@@ -860,7 +898,13 @@ def start_producer_in_thread(testcaseEnv
 
     logger.info("starting producer preformance", extra=d)
 
-    producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance",
entityId, "default")
+    producerLogPath  = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance",
entityId, "default")
+    metricsDir       = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance",
entityId, "metrics")
+
+    if host != "localhost":
+        producerLogPath = replace_kafka_home(producerLogPath, kafkaHome)
+        metricsDir      = replace_kafka_home(metricsDir, kafkaHome)
+
     producerLogPathName = producerLogPath + "/producer_performance.log"
 
     testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
@@ -898,7 +942,7 @@ def start_producer_in_thread(testcaseEnv
                        "--producer-retry-backoff-ms " + retryBackoffMs,
                        "--producer-num-retries " + numOfRetries,
                        "--csv-reporter-enabled",
-                       "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv,
"producer_performance", entityId, "metrics"),
+                       "--metrics-dir " + metricsDir,
                        boolArgumentsStr,
                        " >> " + producerLogPathName,
                        " & echo pid:$! > " + producerLogPath + "/entity_" + entityId
+ "_pid'"]
@@ -1016,6 +1060,11 @@ def create_topic(systemTestEnv, testcase
         else:
             raise Exception("Empty zkConnectStr found")
 
+        testcaseBaseDir = testcaseEnv.testCaseBaseDir
+
+        if zkHost != "localhost":
+            testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome)
+
         for topic in topicsList:
             logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d)

             cmdList = ["ssh " + zkHost,
@@ -1025,7 +1074,7 @@ def create_topic(systemTestEnv, testcase
                        " --zookeeper " + zkConnectStr,
                        " --replica "   + testcaseEnv.testcaseArgumentsDict["replica_factor"],
                        " --partition " + testcaseEnv.testcaseArgumentsDict["num_partition"]
+ " >> ",
-                       testcaseEnv.testCaseBaseDir + "/logs/create_source_cluster_topic.log'"]
+                       testcaseBaseDir + "/logs/create_source_cluster_topic.log'"]
     
             cmdStr = " ".join(cmdList)
             logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -1159,6 +1208,23 @@ def cleanup_data_at_remote_hosts(systemT
 
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
     testcaseConfigsList         = testcaseEnv.testcaseConfigsList
+    testCaseBaseDir             = testcaseEnv.testCaseBaseDir
+
+    # clean up the following directories in localhost
+    #     system_test/<xxxx_testsuite>/testcase_xxxx/config
+    #     system_test/<xxxx_testsuite>/testcase_xxxx/dashboards
+    #     system_test/<xxxx_testsuite>/testcase_xxxx/logs
+    logger.info("cleaning up test case dir: [" + testCaseBaseDir + "]", extra=d)
+
+    if "system_test" not in testCaseBaseDir:
+        logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
+        logger.warn("check config file: system_test/cluster_config.properties", extra=d)
+        logger.warn("aborting test...", extra=d)
+        sys.exit(1)
+    else:
+        system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/config/*")
+        system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/dashboards/*")
+        system_test_utils.sys_call("rm -rf " + testCaseBaseDir + "/logs/*")
 
     for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
 
@@ -1166,10 +1232,14 @@ def cleanup_data_at_remote_hosts(systemT
         entityId         = clusterEntityConfigDict["entity_id"]
         role             = clusterEntityConfigDict["role"]
         kafkaHome        = clusterEntityConfigDict["kafka_home"]
-        testCaseBaseDir  = testcaseEnv.testCaseBaseDir
         cmdStr           = ""
         dataDir          = ""
 
+        if hostname == "localhost":
+            remoteTestCaseBaseDir = testCaseBaseDir
+        else:
+            remoteTestCaseBaseDir = replace_kafka_home(testCaseBaseDir, kafkaHome)
+
         logger.info("cleaning up data dir on host: [" + hostname + "]", extra=d)
 
         if role == 'zookeeper':
@@ -1199,26 +1269,31 @@ def cleanup_data_at_remote_hosts(systemT
         # ============================
         if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
             # so kafkaHome is a real kafka installation
-            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.log' |
xargs rm 2> /dev/null\""
+            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.log'
| xargs rm 2> /dev/null\""
             logger.debug("executing command [" + cmdStr + "]", extra=d)
             system_test_utils.sys_call(cmdStr)
 
-            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*_pid' |
xargs rm 2> /dev/null\""
+            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*_pid'
| xargs rm 2> /dev/null\""
             logger.debug("executing command [" + cmdStr + "]", extra=d)
             system_test_utils.sys_call(cmdStr)
 
-            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.csv' |
xargs rm 2> /dev/null\""
+            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.csv'
| xargs rm 2> /dev/null\""
             logger.debug("executing command [" + cmdStr + "]", extra=d)
             system_test_utils.sys_call(cmdStr)
 
-            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.svg' |
xargs rm 2> /dev/null\""
+            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.svg'
| xargs rm 2> /dev/null\""
             logger.debug("executing command [" + cmdStr + "]", extra=d)
             system_test_utils.sys_call(cmdStr)
 
-            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.html'
| xargs rm 2> /dev/null\""
+            cmdStr = "ssh " + hostname + " \"find " + remoteTestCaseBaseDir + " -name '*.html'
| xargs rm 2> /dev/null\""
             logger.debug("executing command [" + cmdStr + "]", extra=d)
             system_test_utils.sys_call(cmdStr)
 
+def replace_kafka_home(systemTestSubDirPath, kafkaHome):
+    matchObj = re.match(".*(\/system_test\/.*)$", systemTestSubDirPath)
+    relativeSubDirPath = matchObj.group(1)
+    return kafkaHome + relativeSubDirPath
+
 def get_entity_log_directory(testCaseBaseDir, entity_id, role):
     return testCaseBaseDir + "/logs/" + role + "-" + entity_id
 
@@ -1602,6 +1677,9 @@ def start_simple_consumer(systemTestEnv,
         kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
         consumerLogPath   = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
entityId, "default")
 
+        if host != "localhost":
+            consumerLogPath = replace_kafka_home(consumerLogPath, kafkaHome)
+
         # testcase configurations:
         testcaseList = testcaseEnv.testcaseConfigsList
         topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId,
"topic")



Mime
View raw message