kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1401875 [4/4] - in /incubator/kafka/branches/0.8/system_test: ./ migration_tool_testsuite/ migration_tool_testsuite/0.7/ migration_tool_testsuite/0.7/bin/ migration_tool_testsuite/0.7/lib/ migration_tool_testsuite/config/ migration_tool_te...
Date Wed, 24 Oct 2012 20:56:21 GMT
Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1401875&r1=1401874&r2=1401875&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Wed Oct 24 20:56:16
2012
@@ -22,6 +22,7 @@
 
 import datetime
 import getpass
+import hashlib
 import inspect
 import json
 import logging
@@ -80,6 +81,8 @@ def get_testcase_config_log_dir_pathname
     # type is either "metrics" or "dashboards" or "default"
     if type == "metrics":
         return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/metrics"
+    elif type == "log_segments" :
+        return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId + "/log_segments"
     elif type == "default" :
         return testcaseEnv.testCaseLogsDir + "/" + role + "-" + entityId
     elif type == "dashboards":
@@ -110,19 +113,19 @@ def generate_testcase_log_dirs(systemTes
 
         metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entityId,
"metrics")
         if not os.path.exists(metricsPathName) : os.makedirs(metricsPathName)
-        
+
         # create the role directory under dashboards
         dashboardsRoleDir = dashboardsPathName + "/" + role
         if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir)
         
 
-
 def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv):
     anonLogger.info("================================================")
     anonLogger.info("collecting logs from remote machines")
     anonLogger.info("================================================")
 
     testCaseBaseDir = testcaseEnv.testCaseBaseDir
+    tcConfigsList   = testcaseEnv.testcaseConfigsList
 
     for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
         hostname   = clusterEntityConfigDict["hostname"]
@@ -157,6 +160,20 @@ def collect_logs_from_remote_hosts(syste
         logger.debug("executing command [" + cmdStr + "]", extra=d)
         system_test_utils.sys_call(cmdStr)
 
+        # ==============================
+        # collect broker log segment file
+        # ==============================
+        if role == "broker":
+            dataLogPathName = system_test_utils.get_data_by_lookup_keyval(
+                                  testcaseEnv.testcaseConfigsList, "entity_id", entity_id,
"log.dir")
+
+            cmdList = ["scp -r",
+                       hostname + ":" + dataLogPathName,
+                       logPathName]
+            cmdStr  = " ".join(cmdList)
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            system_test_utils.sys_call(cmdStr)
+
     # ==============================
     # collect dashboards file
     # ==============================
@@ -321,7 +338,8 @@ def generate_overriden_props_files(tests
             testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"][key] = val
 
         else:
-            logger.error("Unknown cluster name: " + clusterName)
+            logger.error("Invalid cluster name: " + clusterName, extra=d)
+            raise Exception("Invalid cluster name : " + clusterName)
             sys.exit(1)
 
     # update broker cluster info into "testcaseEnv.userDefinedEnvVarDict"
@@ -344,7 +362,8 @@ def generate_overriden_props_files(tests
             else:
                 testcaseEnv.userDefinedEnvVarDict["targetBrokerList"] += "," + hostname +
":" + port
         else:
-            logger.error("Unknown cluster name: " + clusterName)
+            logger.error("Invalid cluster name: " + clusterName, extra=d)
+            raise Exception("Invalid cluster name : " + clusterName)
             sys.exit(1)
 
     # for each entity in the cluster config
@@ -363,7 +382,7 @@ def generate_overriden_props_files(tests
                     elif clusterCfg["cluster_name"] == "target":
                         tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
                     else:
-                        logger.error("Unknown cluster name: " + clusterName)
+                        logger.error("Unknown cluster name: " + clusterName, extra=d)
                         sys.exit(1)
 
                     addedCSVConfig = {}
@@ -384,7 +403,7 @@ def generate_overriden_props_files(tests
                             cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg,
                             testcaseEnv.userDefinedEnvVarDict["targetZkHostPortDict"])
                     else:
-                        logger.error("Unknown cluster name: " + clusterName)
+                        logger.error("Unknown cluster name: " + clusterName, extra=d)
                         sys.exit(1)
 
                 elif ( clusterCfg["role"] == "mirror_maker"):
@@ -397,32 +416,6 @@ def generate_overriden_props_files(tests
                     copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties",
                         cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"],
tcCfg, None)
                 
-                elif ( clusterCfg["role"] == "producer_performance" ):
-                    copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties",
-                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
-
-                elif ( clusterCfg["role"] == "console_consumer" ):
-                    copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties",
-                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
-                
-                elif ( clusterCfg["role"] == "producer" ):
-                    addedCSVConfig = {}
-                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv,
"producer", clusterCfg["entity_id"], "metrics") 
-                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
-                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter"

-                    addedCSVConfig["kfka.metrics.polling.interval.secsafka.csv.metrics.reporter.enabled"]
= "true"
-                    copy_file_with_dict_values(cfgTemplatePathname + "/producer.properties",
-                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
-
-
-                elif ( clusterCfg["role"] == "consumer" ):
-                    addedCSVConfig = {}
-                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv,
"consumer", clusterCfg["entity_id"], "metrics") 
-                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
-                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter"

-                    addedCSVConfig["kfka.metrics.polling.interval.secsafka.csv.metrics.reporter.enabled"]
= "true"
-                    copy_file_with_dict_values(cfgTemplatePathname + "/consumer.properties",
-                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
                 else:
                     logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d)
 
@@ -747,7 +740,7 @@ def start_console_consumer(systemTestEnv
         elif clusterName == "target":
             zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
         else:
-            logger.error("Invalid cluster name : " + clusterName)
+            logger.error("Invalid cluster name : " + clusterName, extra=d)
             sys.exit(1)
 
         cmdList = ["ssh " + host,
@@ -836,7 +829,7 @@ def start_producer_in_thread(testcaseEnv
     elif clusterName == "target":
         brokerListStr  = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
     else:
-        logger.error("Unknown cluster name: " + clusterName)
+        logger.error("Unknown cluster name: " + clusterName, extra=d)
         sys.exit(1)
 
     logger.info("starting producer preformance", extra=d)
@@ -942,14 +935,21 @@ def start_producer_in_thread(testcaseEnv
             testcaseEnv.lock.release()
         time.sleep(1)
 
-def stop_remote_entity(systemTestEnv, entityId, parentPid):
+def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
     hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"entity_id", entityId, "hostname")
     pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 
-    logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
-    system_test_utils.sigterm_remote_process(hostname, pidStack)
+    logger.debug("terminating (" + signalType + ") process id: " + parentPid + " in host:
" + hostname, extra=d)
+
+    if signalType.lower() == "sigterm":
+        system_test_utils.sigterm_remote_process(hostname, pidStack)
+    elif signalType.lower() == "sigkill":
+        system_test_utils.sigkill_remote_process(hostname, pidStack)
+    else:
+        logger.error("Invalid signal type: " + signalType, extra=d)
+        raise Exception("Invalid signal type: " + signalType)
 
 
 def force_stop_remote_entity(systemTestEnv, entityId, parentPid):
@@ -1048,6 +1048,7 @@ def validate_data_matched(systemTestEnv,
     for prodPerfCfg in prodPerfCfgList:
         producerEntityId = prodPerfCfg["entity_id"]
         topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList,
"entity_id", producerEntityId, "topic")
+        acks  = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList,
"entity_id", producerEntityId, "request-num-acks")
 
         consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
                            clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
@@ -1089,11 +1090,15 @@ def validate_data_matched(systemTestEnv,
 
         if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
             validationStatusDict["Validate for data matched on topic [" + topic + "]"] =
"PASSED"
-            #return True
+        elif (acks == "1"):
+            missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
+            print "#### missing Percent : ", missingPercentage
+            if missingPercentage <= 1:
+                validationStatusDict["Validate for data matched on topic [" + topic + "]"]
= "PASSED"
+                logger.warn("Test case passes with less than 1% data loss : [" + str(len(missingMsgIdInConsumer))
+ "] missing messages", extra=d)
         else:
             validationStatusDict["Validate for data matched on topic [" + topic + "]"] =
"FAILED"
             logger.info("See " + msgIdMissingInConsumerLogPathName + " for missing MessageID",
extra=d)
-            #return False
 
 
 def validate_leader_election_successful(testcaseEnv, leaderDict, validationStatusDict):
@@ -1215,6 +1220,7 @@ def get_reelection_latency(systemTestEnv
     leaderBrokerId = None
     leaderPPid     = None
     shutdownLeaderTimestamp = None
+    leaderReElectionLatency = -1
 
     if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
         # leader election is not successful - something is wrong => so skip this testcase
@@ -1230,131 +1236,43 @@ def get_reelection_latency(systemTestEnv
             raise
 
         logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid,
extra=d)
-        stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid)
+        signalType = None
+        try:
+            signalType = testcaseEnv.testcaseArgumentsDict["signal_type"]
+        except:
+            pass
+
+        if signalType is None or signalType.lower() == "sigterm":
+            stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid)
+        elif signalType.lower() == "sigkill":
+            stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid, "SIGKILL")
+        else:
+            logger.error("Unsupported signal type: " + signalType, extra=d)
+            raise Exception("Unsupported signal type: " + signalType)
 
     logger.info("sleeping for 10s for leader re-election to complete", extra=d)
     time.sleep(10)
 
     # get broker shut down completed timestamp
     shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict)
-    logger.debug("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])),
extra=d)
+    shutdownTimestamp  = -1
 
-    logger.info("looking up new leader", extra=d)
+    try:
+        shutdownTimestamp = shutdownBrokerDict["timestamp"]
+        logger.debug("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownTimestamp)),
extra=d)
+    except:
+        logger.warn("unable to find broker shut down timestamp", extra=d)
 
+    logger.info("looking up new leader", extra=d)
     leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict)
     logger.debug("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])),
extra=d)
 
-    leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"])
-    logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
+    if shutdownTimestamp > 0:
+        leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownTimestamp)
+        logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec",
extra=d)
  
     return leaderReElectionLatency
 
-def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv):
-
-    anonLogger.info("================================================")
-    anonLogger.info("validating broker log segment checksums")
-    anonLogger.info("================================================")
-
-    # brokerLogCksumDict -
-    #   a dictionary to keep track of log segment files in each brokers and will look like
this:
-    #
-    # {u'broker-1': {'test_1-0/00000000000000000000.kafka': '91500855',
-    #                'test_1-0/00000000000000010255.kafka': '1906285795',
-    #                'test_1-1/00000000000000000000.kafka': '3785861722',
-    #                'test_1-1/00000000000000010322.kafka': '1731628308'},
-    #  u'broker-2': {'test_1-0/00000000000000000000.kafka': '91500855',
-    #                'test_1-0/00000000000000010255.kafka': '1906285795',
-    #                'test_1-1/00000000000000000000.kafka': '3785861722',
-    #                'test_1-1/00000000000000010322.kafka': '1731628308'},
-    #  u'broker-3': {'test_1-0/00000000000000000000.kafka': '91500855',
-    #                'test_1-0/00000000000000010255.kafka': '1906285795',
-    #                'test_1-1/00000000000000000000.kafka': '3431356313'}}
-    brokerLogCksumDict = {}
-
-    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList,
"role", "broker", "entity_id")
-
-    # access all brokers' hosts to get broker id and its corresponding log
-    # segment file checksums and populate brokerLogCksumDict
-    for brokerEntityId in brokerEntityIdList:
-        logCksumDict       = {}
-
-        hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"entity_id", brokerEntityId, "hostname")
-        logDir   = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList,
"entity_id", brokerEntityId, "log.dir")
-
-        # get the log segment file full path name
-        cmdStr   = "ssh " + hostname + " \"find " + logDir + " -name '*.log'\" 2> /dev/null"
-        logger.debug("executing command [" + cmdStr + "]", extra=d)
-        subproc  = system_test_utils.sys_call_return_subproc(cmdStr)
-        for line in subproc.stdout.readlines():
-            # Need a key to identify each corresponding log segment file in different brokers:
-            #   This can be achieved by using part of the full log segment file path as a
key to identify
-            #   the individual log segment file checksums. The key can be extracted from
the path name
-            #   and starting from "topic-partition" such as:
-            #     full log segment path name   : /tmp/kafka_server_1_logs/test_1-0/00000000000000010255.kafka
-            #     part of the path name as key : test_1-0/00000000000000010255.kafka
-            logSegmentPathName = line.rstrip('\n')
-            substrIndex        = logSegmentPathName.index(logDir) + len(logDir + "/")
-            logSegmentFile     = logSegmentPathName[substrIndex:]
-
-            # get log segment file checksum
-            cksumCmdStr = "ssh " + hostname + " \"cksum " + logSegmentPathName + " | cut
-f1 -d ' '\" 2> /dev/null"
-            subproc2 = system_test_utils.sys_call_return_subproc(cksumCmdStr)
-            for line2 in subproc2.stdout.readlines():
-                checksum = line2.rstrip('\n')
-                # use logSegmentFile as a key to associate with its checksum
-                logCksumDict[logSegmentFile] = checksum
-
-        # associate this logCksumDict with its broker id
-        brokerLogCksumDict["broker-"+brokerEntityId] = logCksumDict
-
-    # use a list of sets for checksums comparison
-    sets = []
-    for brokerId, logCksumDict in brokerLogCksumDict.items():
-        sets.append( set(logCksumDict.items()) )
-
-    # looping through the sets and compare between broker[n] & broker[n+1] ...
-    idx = 0
-    diffItemSet = None
-    while idx < len(sets) - 1:
-        diffItemSet = sets[idx] ^ sets[idx + 1]
-
-        if (len(diffItemSet) > 0):
-            logger.error("Mismatch found : " + str(diffItemSet), extra=d)
-            testcaseEnv.validationStatusDict["Log segment checksum matching across all replicas"]
= "FAILED"
-
-            # get the mismatched items key, i.e. the log segment file name
-            diffItemList = list(diffItemSet)
-            diffItemKeys = []
-            for keyvalSet in diffItemList:
-                keyvalList = list(keyvalSet)
-                diffItemKeys.append(keyvalList[0])
-
-            # mismatch found - so print out the whole log segment file checksum
-            # info with the mismatched checksum highlighted
-            for brokerId in sorted(brokerLogCksumDict.iterkeys()):
-                logCksumDict = brokerLogCksumDict[brokerId]
-                print brokerId,":"
-                for logSegmentFile in sorted(logCksumDict.iterkeys()):
-                    checksum = logCksumDict[logSegmentFile]
-                    sys.stdout.write(logSegmentFile + " => " + checksum)
-                    try:
-                        if diffItemKeys.index(logSegmentFile) >= 0:
-                            sys.stdout.write("    <<<< not matching across all
replicas")
-                    except:
-                        pass
-                    print
-                print
-            return
-        idx += 1
-
-    # getting here means all log segment checksums matched
-    testcaseEnv.validationStatusDict["Log segment checksum matching across all replicas"]
= "PASSED"
-
-    anonLogger.info("log segment files checksum :")
-    print
-    pprint.pprint(brokerLogCksumDict)
-    print
 
 def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
 
@@ -1488,4 +1406,320 @@ def validate_07_08_migrated_data_matched
             logger.info("See " + msgChecksumMissingInConsumerLogPathName + " for missing
MessageID", extra=d)
             #return False
 
+def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName="source"):
+
+    anonLogger.info("================================================")
+    anonLogger.info("validating merged broker log segment checksums")
+    anonLogger.info("================================================")
+
+    brokerLogCksumDict   = {}
+    testCaseBaseDir      = testcaseEnv.testCaseBaseDir
+    tcConfigsList        = testcaseEnv.testcaseConfigsList
+    validationStatusDict = testcaseEnv.validationStatusDict
+    clusterConfigList    = systemTestEnv.clusterEntityConfigDictList
+    #brokerEntityIdList   = system_test_utils.get_data_from_list_of_dicts(clusterConfigList,
"role", "broker", "entity_id")
+    allBrokerConfigList  = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList,
"role", "broker")
+    brokerEntityIdList   = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList,
"cluster_name", clusterName, "entity_id")
+
+    # loop through all brokers
+    for brokerEntityId in brokerEntityIdList:
+        logCksumDict   = {}
+        # remoteLogSegmentPathName : /tmp/kafka_server_4_logs
+        # => remoteLogSegmentDir   : kafka_server_4_logs
+        remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList,
"entity_id", brokerEntityId, "log.dir")
+        remoteLogSegmentDir      = os.path.basename(remoteLogSegmentPathName)
+        logPathName              = get_testcase_config_log_dir_pathname(testcaseEnv, "broker",
brokerEntityId, "default")
+        localLogSegmentPath      = logPathName + "/" + remoteLogSegmentDir
+
+        # localLogSegmentPath :
+        # .../system_test/mirror_maker_testsuite/testcase_5002/logs/broker-4/kafka_server_4_logs
+        #   |- test_1-0
+        #        |- 00000000000000000000.index
+        #        |- 00000000000000000000.log
+        #        |- 00000000000000000020.index
+        #        |- 00000000000000000020.log
+        #        |- . . .
+        #   |- test_1-1
+        #        |- 00000000000000000000.index
+        #        |- 00000000000000000000.log
+        #        |- 00000000000000000020.index
+        #        |- 00000000000000000020.log
+        #        |- . . .
+
+        # loop through all topicPartition directories such as : test_1-0, test_1-1, ... 
+        for topicPartition in os.listdir(localLogSegmentPath):
+            # found a topic-partition directory
+            if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
+                # md5 hasher
+                m = hashlib.md5()
+
+                # logSegmentKey is like this : kafka_server_9_logs:test_1-0 (delimited by
':')
+                logSegmentKey = remoteLogSegmentDir + ":" + topicPartition
+
+                # log segment files are located in : localLogSegmentPath + "/" + topicPartition
+                # sort the log segment files under each topic-partition and get the md5 checksum
+                for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)):
+                    # only process log file: *.log
+                    if logFile.endswith(".log"):
+                        # read the log segment file as binary
+                        offsetLogSegmentPathName = localLogSegmentPath + "/" + topicPartition
+ "/" + logFile
+                        fin = file(offsetLogSegmentPathName, 'rb')
+                        # keep reading 64K max at a time
+                        while True:
+                            data = fin.read(65536)
+                            if not data:
+                                fin.close()
+                                break
+                            # update it into the hasher
+                            m.update(data)
+
+                # update the md5 checksum into brokerLogCksumDict with the corresponding
key
+                brokerLogCksumDict[logSegmentKey] = m.hexdigest()
+
+    # print it out to the console for reference
+    pprint.pprint(brokerLogCksumDict)
+
+    # brokerLogCksumDict will look like this:
+    # {
+    #   'kafka_server_1_logs:tests_1-0': 'd41d8cd98f00b204e9800998ecf8427e',
+    #   'kafka_server_1_logs:tests_1-1': 'd41d8cd98f00b204e9800998ecf8427e',
+    #   'kafka_server_1_logs:tests_2-0': 'd41d8cd98f00b204e9800998ecf8427e',
+    #   'kafka_server_1_logs:tests_2-1': 'd41d8cd98f00b204e9800998ecf8427e',
+    #   'kafka_server_2_logs:tests_1-0': 'd41d8cd98f00b204e9800998ecf8427e',
+    #   'kafka_server_2_logs:tests_1-1': 'd41d8cd98f00b204e9800998ecf8427e',
+    #   'kafka_server_2_logs:tests_2-0': 'd41d8cd98f00b204e9800998ecf8427e',
+    #   'kafka_server_2_logs:tests_2-1': 'd41d8cd98f00b204e9800998ecf8427e'
+    # }
+
+    checksumDict = {}
+    # organize the checksum according to their topic-partition and checksumDict will look
like this:
+    # {
+    #   'test_1-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'],
+    #   'test_1-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'],
+    #   'test_2-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'],
+    #   'test_2-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e']
+    # }
+  
+    for brokerTopicPartitionKey, md5Checksum in brokerLogCksumDict.items():
+        tokens = brokerTopicPartitionKey.split(":")
+        brokerKey      = tokens[0]
+        topicPartition = tokens[1]
+        if topicPartition in checksumDict:
+            # key already exist
+            checksumDict[topicPartition].append(md5Checksum)
+        else:
+            # new key => create a new list to store checksum
+            checksumDict[topicPartition] = []
+            checksumDict[topicPartition].append(md5Checksum)
+
+    failureCount = 0
+
+    # loop through checksumDict: the checksums should be the same inside each
+    # topic-partition's list. Otherwise, checksum mismatched is detected
+    for topicPartition, checksumList in checksumDict.items():
+        checksumSet = frozenset(checksumList)
+        if len(checksumSet) > 1:
+            failureCount += 1
+            logger.error("merged log segment checksum in " + topicPartition + " mismatched",
extra=d)
+        elif len(checksumSet) == 1:
+            logger.debug("merged log segment checksum in " + topicPartition + " matched",
extra=d)
+        else:
+            logger.error("unexpected error in " + topicPartition, extra=d)
+            
+    if failureCount == 0:
+        validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName
+ "]"] = "PASSED"
+    else:
+        validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName
+ "]"] = "FAILED"
+
+def start_simple_consumer(systemTestEnv, testcaseEnv):
+
+    clusterList        = systemTestEnv.clusterEntityConfigDictList
+    consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role",
"console_consumer")
+    for consumerConfig in consumerConfigList:
+        host              = consumerConfig["hostname"]
+        entityId          = consumerConfig["entity_id"]
+        jmxPort           = consumerConfig["jmx_port"] 
+        clusterName       = consumerConfig["cluster_name"] 
+        kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id",
entityId, "kafka_home")
+        javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id",
entityId, "java_home")
+        kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
+        consumerLogPath   = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
entityId, "default")
+
+        # testcase configurations:
+        testcaseList = testcaseEnv.testcaseConfigsList
+        topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId,
"topic")
+
+        brokerListStr = ""
+        if clusterName == "source":
+            brokerListStr  = testcaseEnv.userDefinedEnvVarDict["sourceBrokerList"]
+        elif clusterName == "target":
+            brokerListStr  = testcaseEnv.userDefinedEnvVarDict["targetBrokerList"]
+        else:
+            logger.error("Invalid cluster name : " + clusterName, extra=d)
+            raise Exception("Invalid cluster name : " + clusterName)
+
+        if len(brokerListStr) == 0:
+            logger.error("Empty broker list str", extra=d)
+            raise Exception("Empty broker list str")
+
+        numPartitions = None
+        try:
+            numPartitions = testcaseEnv.testcaseArgumentsDict["num_partition"]
+        except:
+            pass
+
+        if numPartitions is None:
+            logger.error("Invalid no. of partitions: " + numPartitions, extra=d)
+            raise Exception("Invalid no. of partitions: " + numPartitions)
+        else:
+            numPartitions = int(numPartitions)
+
+        replicaIndex   = 1
+        brokerPortList = brokerListStr.split(',')
+        for brokerPort in brokerPortList:
+
+            k = 0
+            while (k < numPartitions):
+                logger.info("starting debug consumer for replica on [" + brokerPort + "]
partition [" + str(k) + "]", extra=d)
+                brokerPortLabel = brokerPort.replace(":", "_")
+                cmdList = ["ssh " + host,
+                           "'JAVA_HOME=" + javaHome,
+                           kafkaRunClassBin + " kafka.tools.SimpleConsumerShell",
+                           "--broker-list " + brokerListStr,
+                           "--topic " + topic,
+                           "--partition " + str(k),
+                           "--replica " + str(replicaIndex),
+                           "--no-wait-at-logend ",
+                           " >> " + consumerLogPath + "/simple_consumer_" + str(replicaIndex)
+ ".log",
+                           " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId
+ "_pid'"]
+    
+                cmdStr = " ".join(cmdList)
+    
+                logger.debug("executing command: [" + cmdStr + "]", extra=d)
+                system_test_utils.async_sys_call(cmdStr)
+                time.sleep(2)
+    
+                pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId
+ "_pid'"
+                logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+                subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+    
+                # keep track of the remote entity pid in a dictionary
+                for line in subproc.stdout.readlines():
+                    if line.startswith("pid"):
+                        line = line.rstrip('\n')
+                        logger.debug("found pid line: [" + line + "]", extra=d)
+                        tokens = line.split(':')
+                        testcaseEnv.consumerHostParentPidDict[host] = tokens[1]
+    
+                logger.info("sleeping for 5 sec",extra=d)
+                time.sleep(5)
+                k += 1
+            replicaIndex += 1
+
+def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv):
+    validationStatusDict        = testcaseEnv.validationStatusDict
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList,
"role", "producer_performance")
+    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList,
"role", "console_consumer")
+
+    mismatchCount = 0
+
+    for prodPerfCfg in prodPerfCfgList:
+        producerEntityId = prodPerfCfg["entity_id"]
+        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList,
"entity_id", producerEntityId, "topic")
+        acks  = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList,
"entity_id", producerEntityId, "request-num-acks")
+        logger.debug("request-num-acks [" + acks + "]", extra=d)
+
+        consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
+                           clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
+
+        matchingConsumerEntityId = None
+        for consumerEntityId in consumerEntityIdList:
+            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList,
"entity_id", consumerEntityId, "topic")
+            if consumerTopic in topic:
+                matchingConsumerEntityId = consumerEntityId
+                break
+
+        if matchingConsumerEntityId is None:
+            break
+
+        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance",
producerEntityId, "default")
+        producerLogPathName = producerLogPath + "/producer_performance.log"
+        producerMsgIdList   = get_message_id(producerLogPathName)
+        producerMsgIdSet    = set(producerMsgIdList)
+        logger.info("no. of unique messages on topic [" + topic + "] sent from publisher
 : " + str(len(producerMsgIdSet)), extra=d)
+        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
+
+        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer",
matchingConsumerEntityId, "default")
+        for logFile in sorted(os.listdir(consumerLogPath)):
+            # only process log file: *.log
+            if logFile.endswith(".log"):
+                consumerLogPathName = consumerLogPath + "/" + logFile
+                consumerMsgIdList   = get_message_id(consumerLogPathName)
+                consumerMsgIdSet   = set(consumerMsgIdList)
+                missingMsgIdInConsumer = producerMsgIdSet - consumerMsgIdSet
+                msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname(

+                    testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
+ \
+                    "/" + logFile + "_msg_id_missing_in_consumer.log"
+
+                outfile = open(msgIdMissingInConsumerLogPathName, "w")
+                for id in missingMsgIdInConsumer:
+                    outfile.write(id + "\n")
+                outfile.close()
+
+                logger.info("no. of unique messages on topic [" + topic + "] at " + logFile
+ " : " + str(len(consumerMsgIdSet)), extra=d)
+                validationStatusDict["Unique messages from consumer on [" + topic + "] at
" + logFile] = str(len(consumerMsgIdSet))
+
+                if acks == "-1" and len(missingMsgIdInConsumer) > 0:
+                    mismatchCount += 1
+                elif acks == "1" and len(missingMsgIdInConsumer) > 0:
+                    missingPercentage = len(missingMsgIdInConsumer) * 100 / len(producerMsgIdSet)
+                    logger.debug("missing percentage [" + str(missingPercentage) + "]", extra=d)
+                    if missingPercentage <= 1:
+                        logger.warn("Test case (acks == 1) passes with < 1% data loss
: [" + \
+                            str(len(missingMsgIdInConsumer)) + "] missing messages", extra=d)
+                    else:
+                        mismatchCount += 1
+
+        if mismatchCount == 0:
+            validationStatusDict["Validate for data matched on topic [" + topic + "]"] =
"PASSED"
+        else:
+            validationStatusDict["Validate for data matched on topic [" + topic + "]"] =
"FAILED"
+
+def get_controller_attributes(systemTestEnv, testcaseEnv):
+
+    logger.info("Querying Zookeeper for Controller info ...", extra=d)
+
+    # keep track of controller data in this dict such as broker id & entity id
+    controllerDict = {} 
+
+    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
+    tcConfigsList      = testcaseEnv.testcaseConfigsList
+
+    zkDictList         = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList,
"role", "zookeeper")
+    firstZkDict        = zkDictList[0]
+    hostname           = firstZkDict["hostname"]
+    zkEntityId         = firstZkDict["entity_id"]
+    clientPort         = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id",
zkEntityId, "clientPort")
+    kafkaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList,
"entity_id", zkEntityId, "kafka_home")
+    javaHome           = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList,
"entity_id", zkEntityId, "java_home")
+    kafkaRunClassBin   = kafkaHome + "/bin/kafka-run-class.sh"
+
+    cmdStrList = ["ssh " + hostname,
+                  "\"JAVA_HOME=" + javaHome,
+                  kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
+                  "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+                  "'get /controller' 2> /dev/null | tail -1\""]
+
+    cmdStr = " ".join(cmdStrList)
+    logger.debug("executing command [" + cmdStr + "]", extra=d)
+    subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+    for line in subproc.stdout.readlines():
+        brokerid = line.rstrip('\n')
+        controllerDict["brokerid"]  = brokerid
+        controllerDict["entity_id"] = system_test_utils.get_data_by_lookup_keyval(
+                                          tcConfigsList, "brokerid", brokerid, "entity_id")
+    return controllerDict
+
 

Modified: incubator/kafka/branches/0.8/system_test/utils/replication_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/replication_utils.py?rev=1401875&r1=1401874&r2=1401875&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/replication_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/replication_utils.py Wed Oct 24 20:56:16
2012
@@ -58,3 +58,10 @@ class ReplicationUtils(object):
             self.leaderAttributesDict["LEADER_ELECTION_COMPLETED_MSG"] + \
             " for topic (.*?) partition (.*?) \(.*"
 
+        # Controller attributes
+        self.isControllerLogPattern    = "Controller startup complete"
+        self.controllerAttributesDict  = {}
+        self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"] = self.isControllerLogPattern
+        self.controllerAttributesDict["REGX_CONTROLLER_STARTUP_PATTERN"] = "\[(.*?)\] .*
\[Controller (.*?)\]: " + \
+            self.controllerAttributesDict["CONTROLLER_STARTUP_COMPLETE_MSG"]
+

Modified: incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py?rev=1401875&r1=1401874&r2=1401875&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py Wed Oct 24 20:56:16
2012
@@ -219,7 +219,6 @@ def get_child_processes(pid):
             break
     return pidStack
 
-
 def sigterm_remote_process(hostname, pidStack):
 
     while ( len(pidStack) > 0 ):
@@ -233,7 +232,6 @@ def sigterm_remote_process(hostname, pid
             print "WARN - pid:",pid,"not found"
             raise
 
-
 def sigkill_remote_process(hostname, pidStack):
 
     while ( len(pidStack) > 0 ):
@@ -247,6 +245,35 @@ def sigkill_remote_process(hostname, pid
             print "WARN - pid:",pid,"not found"
             raise
 
+def simulate_garbage_collection_pause_in_remote_process(hostname, pidStack, pauseTimeInSeconds):
+    pausedPidStack = []
+
+    # pause the processes
+    while len(pidStack) > 0:
+        pid = pidStack.pop()
+        pausedPidStack.append(pid)
+        cmdStr = "ssh " + hostname + " 'kill -SIGSTOP " + pid + "'"
+
+        try:
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            sys_call_return_subproc(cmdStr)
+        except:
+            print "WARN - pid:",pid,"not found"
+            raise
+
+    time.sleep(int(pauseTimeInSeconds))
+
+    # resume execution of the processes
+    while len(pausedPidStack) > 0:
+        pid = pausedPidStack.pop()
+        cmdStr = "ssh " + hostname + " 'kill -SIGCONT " + pid + "'"
+
+        try:
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            sys_call_return_subproc(cmdStr)
+        except:
+            print "WARN - pid:",pid,"not found"
+            raise
 
 def terminate_process(pidStack):
     while ( len(pidStack) > 0 ):
@@ -359,6 +386,8 @@ def setup_remote_hosts(systemTestEnv):
 
         if hostname == "localhost" and kafkaHome == "default":
             clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome
+        if hostname == "localhost" and kafkaHome == "system_test/migration_tool_testsuite/0.7":
+            clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome + "/system_test/migration_tool_testsuite/0.7"
 
         kafkaHome = clusterEntityConfigDict["kafka_home"]
         javaHome  = clusterEntityConfigDict["java_home"]



Mime
View raw message