kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1384854 - in /incubator/kafka/branches/0.8/system_test: replication_testsuite/ replication_testsuite/config/ replication_testsuite/testcase_1/ utils/
Date Fri, 14 Sep 2012 17:20:04 GMT
Author: nehanarkhede
Date: Fri Sep 14 17:20:04 2012
New Revision: 1384854

URL: http://svn.apache.org/viewvc?rev=1384854&view=rev
Log:
KAFKA-449: Leader election test

Modified:
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/testcase_env.py

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties?rev=1384854&r1=1384853&r2=1384854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
(original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
Fri Sep 14 17:20:04 2012
@@ -1,6 +1,4 @@
 topic=mytest
-messages=200
 message-size=100
 thread=5
-initial-message-id=0
 compression-codec=0

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1384854&r1=1384853&r2=1384854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Fri
Sep 14 17:20:04 2012
@@ -82,7 +82,9 @@ class ReplicaBasicTest(SetupUtils):
                 self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
                 self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
     
-                # initialize self.testcaseEnv with user-defined environment
+                # ======================================================================
+                # initialize self.testcaseEnv with user-defined environment variables
+                # ======================================================================
                 self.testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"]
= ReplicaBasicTest.brokerShutDownCompletedPattern
                 self.testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"]
= \
                     "\[(.*?)\] .* \[Kafka Server (.*?)\], " + ReplicaBasicTest.brokerShutDownCompletedPattern
@@ -94,7 +96,9 @@ class ReplicaBasicTest(SetupUtils):
                     " for topic (.*?) partition (.*?) \(.*"
 
                 self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
-    
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]    = False
+                self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False
+
                 # find testcase properties json file
                 testcasePropJsonPathName = system_test_utils.get_testcase_prop_json_pathname(testCasePathName)
                 self.logger.debug("testcasePropJsonPathName : " + testcasePropJsonPathName,
extra=self.d)
@@ -168,7 +172,7 @@ class ReplicaBasicTest(SetupUtils):
                 #    by overriding the settings specified in:
                 #    system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
                 kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName,
self.testcaseEnv, self.systemTestEnv)
-    
+   
                 # =============================================
                 # preparing all entities to start the test
                 # =============================================
@@ -176,62 +180,91 @@ class ReplicaBasicTest(SetupUtils):
                 kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 2s")
                 time.sleep(2)
-    
+        
                 self.log_message("starting brokers")
                 kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
-                self.anonLogger.info("sleeping for 5s")
-                time.sleep(5)
-    
+                self.anonLogger.info("sleeping for 2s")
+                time.sleep(2)
+        
                 self.log_message("creating topics")
                 kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 5s")
                 time.sleep(5)
-    
-                self.log_message("looking up leader")
-                leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv,
self.testcaseEnv)
-    
-                # ==========================
-                # leaderDict looks like this:
-                # ==========================
-                #{'entity_id': u'3',
-                # 'partition': '0',
-                # 'timestamp': 1345050255.8280001,
-                # 'hostname': u'localhost',
-                # 'topic': 'test_1',
-                # 'brokerid': '3'}
-    
-                # =============================================
-                # validate to see if leader election is successful
-                # =============================================
-                self.log_message("validating leader election")
-                result = kafka_system_test_utils.validate_leader_election_successful( \
-                             self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
-    
-                # =============================================
-                # get leader re-election latency
-                # =============================================
-                bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
-                self.log_message("bounce_leader flag : " + bounceLeaderFlag)
-                if (bounceLeaderFlag.lower() == "true"):
-                    reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv,
self.testcaseEnv, leaderDict)
-
+        
                 # =============================================
                 # starting producer 
                 # =============================================
                 self.log_message("starting producer in the background")
                 kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
-                self.anonLogger.info("sleeping for 10s")
-                time.sleep(10)
-    
-                # =============================================
-                # starting previously terminated broker 
-                # =============================================
-                if bounceLeaderFlag.lower() == "true":
-                    self.log_message("starting the previously terminated broker")
-                    stoppedLeaderEntityId  = leaderDict["entity_id"]
-                    kafka_system_test_utils.start_entity_in_background(self.systemTestEnv,
self.testcaseEnv, stoppedLeaderEntityId)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                i = 1
+                numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"])
+                while i <= numIterations:
+
+                    self.log_message("Iteration " + str(i) + " of " + str(numIterations))
+
+                    # looking up leader
+                    leaderDict = kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv,
self.testcaseEnv)
+        
+                    # ==========================
+                    # leaderDict looks like this:
+                    # ==========================
+                    #{'entity_id': u'3',
+                    # 'partition': '0',
+                    # 'timestamp': 1345050255.8280001,
+                    # 'hostname': u'localhost',
+                    # 'topic': 'test_1',
+                    # 'brokerid': '3'}
+        
+                    # =============================================
+                    # validate to see if leader election is successful
+                    # =============================================
+                    self.log_message("validating leader election")
+                    result = kafka_system_test_utils.validate_leader_election_successful(
\
+                                 self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
+        
+                    # =============================================
+                    # get leader re-election latency by stopping leader
+                    # =============================================
+                    bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
+                    self.log_message("bounce_leader flag : " + bounceLeaderFlag)
+                    if (bounceLeaderFlag.lower() == "true"):
+                        reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv,
self.testcaseEnv, leaderDict)
+                        latencyKeyName = "Leader Election Latency - iter " + str(i) + " brokerid
" + leaderDict["brokerid"]
+                        self.testcaseEnv.validationStatusDict[latencyKeyName] = str("{0:.2f}".format(reelectionLatency
* 1000)) + " ms"
+       
+                    # =============================================
+                    # starting previously terminated broker 
+                    # =============================================
+                    if bounceLeaderFlag.lower() == "true":
+                        self.log_message("starting the previously terminated broker")
+                        stoppedLeaderEntityId  = leaderDict["entity_id"]
+                        kafka_system_test_utils.start_entity_in_background(self.systemTestEnv,
self.testcaseEnv, stoppedLeaderEntityId)
+
                     self.anonLogger.info("sleeping for 5s")
                     time.sleep(5)
+                    i += 1
+                # while loop
+
+                # tell producer to stop
+                self.testcaseEnv.lock.acquire()
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+                time.sleep(1)
+                self.testcaseEnv.lock.release()
+                time.sleep(1)
+
+                while 1:
+                    self.testcaseEnv.lock.acquire()
+                    self.logger.info("status of backgroundProducerStopped : [" + \
+                        str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"])
+ "]", extra=self.d)
+                    if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+                        time.sleep(1)
+                        break
+                    time.sleep(1)
+                    self.testcaseEnv.lock.release()
+                    time.sleep(2)
 
                 # =============================================
                 # starting consumer
@@ -240,12 +273,15 @@ class ReplicaBasicTest(SetupUtils):
                 kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 10s")
                 time.sleep(10)
-                
+                    
                 # this testcase is completed - so stopping all entities
                 self.log_message("stopping all entities")
                 for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
                     kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId,
parentPid)
-                    
+
+                # make sure all entities are stopped
+                kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
+
                 # validate the data matched
                 # =============================================
                 self.log_message("validating data matched")
@@ -268,10 +304,6 @@ class ReplicaBasicTest(SetupUtils):
                                              self.testcaseEnv.testCaseDashboardsDir,
                                              self.systemTestEnv.clusterEntityConfigDictList)
                 
-                # stop metrics processes
-                for entity in self.systemTestEnv.clusterEntityConfigDictList:
-                    metrics.stop_metrics_collection(entity['hostname'], entity['jmx_port'])
-                                        
             except Exception as e:
                 self.log_message("Exception while running test {0}".format(e))
                 traceback.print_exc()
@@ -297,5 +329,4 @@ class ReplicaBasicTest(SetupUtils):
                         self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname,
"entity_id")
                     kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv,
producerEntityId, producerPPid)
 
-                #kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
 

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json?rev=1384854&r1=1384853&r2=1384854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
(original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
Fri Sep 14 17:20:04 2012
@@ -3,7 +3,10 @@
   "testcase_args": {
     "bounce_leader": "true",
     "replica_factor": "3",
-    "num_partition": "1"
+    "num_partition": "2",
+    "num_iteration": "2",
+    "sleep_seconds_between_producer_calls": "1",
+    "num_messages_to_produce_per_producer_call": "50"
   },
   "entities": [
     {

Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1384854&r1=1384853&r2=1384854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Fri Sep 14 17:20:04
2012
@@ -29,6 +29,7 @@ import os
 import re
 import subprocess
 import sys
+import thread
 import time
 import traceback
 
@@ -370,7 +371,7 @@ def get_broker_shutdown_log_line(systemT
             line = line.rstrip('\n')
 
             if testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line:
-                logger.info("found the log line : " + line, extra=d)
+                logger.debug("found the log line : " + line, extra=d)
                 try:
                     matchObj    = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"],
line)
                     datetimeStr = matchObj.group(1)
@@ -379,7 +380,7 @@ def get_broker_shutdown_log_line(systemT
                     #print "{0:.3f}".format(unixTs)
                     shutdownBrokerDict["timestamp"] = unixTs
                     shutdownBrokerDict["brokerid"]  = matchObj.group(2)
-                    logger.info("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id:
[" + shutdownBrokerDict["entity_id"] + "]", extra=d)
+                    logger.debug("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id:
[" + shutdownBrokerDict["entity_id"] + "]", extra=d)
                     return shutdownBrokerDict
                 except:
                     logger.error("ERROR [unable to find matching leader details: Has the
matching pattern changed?]", extra=d)
@@ -423,7 +424,7 @@ def get_leader_elected_log_line(systemTe
             line = line.rstrip('\n')
 
             if testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] in line:
-                logger.info("found the log line : " + line, extra=d)
+                logger.debug("found the log line : " + line, extra=d)
                 try:
                     matchObj    = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"],
line)
                     datetimeStr = matchObj.group(1)
@@ -441,7 +442,7 @@ def get_leader_elected_log_line(systemTe
                         leaderDict["partition"] = matchObj.group(4)
                         leaderDict["entity_id"] = brokerEntityId
                         leaderDict["hostname"]  = hostname
-                    logger.info("brokerid: [" + leaderDict["brokerid"] + "] entity_id: ["
+ leaderDict["entity_id"] + "]", extra=d)
+                    logger.debug("brokerid: [" + leaderDict["brokerid"] + "] entity_id: ["
+ leaderDict["entity_id"] + "]", extra=d)
                 except:
                     logger.error("ERROR [unable to find matching leader details: Has the
matching pattern changed?]", extra=d)
                     raise
@@ -504,7 +505,7 @@ def start_entity_in_background(systemTes
     system_test_utils.async_sys_call(cmdStr)
     time.sleep(5)
 
-    pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid'"
+    pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid'
2> /dev/null"
     logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
     subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
 
@@ -551,7 +552,7 @@ def start_console_consumer(systemTestEnv
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
-                   commandArgs + " &> " + consumerLogPathName,
+                   commandArgs + " >> " + consumerLogPathName,
                    " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId +
"_pid'"]
 
         cmdStr = " ".join(cmdList)
@@ -573,75 +574,98 @@ def start_console_consumer(systemTestEnv
                 tokens = line.split(':')
                 testcaseEnv.consumerHostParentPidDict[host] = tokens[1]
 
-
 def start_producer_performance(systemTestEnv, testcaseEnv):
 
-    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
-    testcaseConfigsList         = testcaseEnv.testcaseConfigsList
+    entityConfigList     = systemTestEnv.clusterEntityConfigDictList
+    testcaseConfigsList  = testcaseEnv.testcaseConfigsList
     brokerListStr = ""
 
     # construct "broker-list" for producer
-    for clusterEntityConfigDict in clusterEntityConfigDictList:
-        entityRole = clusterEntityConfigDict["role"]
+    for entityConfig in entityConfigList:
+        entityRole = entityConfig["role"]
         if entityRole == "broker":
-            hostname = clusterEntityConfigDict["hostname"]
-            entityId = clusterEntityConfigDict["entity_id"]
+            hostname = entityConfig["hostname"]
+            entityId = entityConfig["entity_id"]
             port     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id",
entityId, "port")
 
             if len(brokerListStr) == 0:
                 brokerListStr = hostname + ":" + port
             else:
                 brokerListStr = brokerListStr + "," + hostname + ":" + port
- 
 
-    producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \
-                             clusterEntityConfigDictList, "role", "producer_performance")
+    producerConfigList = system_test_utils.get_dict_from_list_of_dicts(entityConfigList,
"role", "producer_performance")
     for producerConfig in producerConfigList:
         host              = producerConfig["hostname"]
         entityId          = producerConfig["entity_id"]
         jmxPort           = producerConfig["jmx_port"] 
         role              = producerConfig["role"] 
-        kafkaHome         = system_test_utils.get_data_by_lookup_keyval( \
-                                clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
-        javaHome          = system_test_utils.get_data_by_lookup_keyval( \
-                                clusterEntityConfigDictList, "entity_id", entityId, "java_home")
-        jmxPort           = system_test_utils.get_data_by_lookup_keyval( \
-                                clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
-        kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
-
-        logger.info("starting producer preformance", extra=d)
 
-        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance",
entityId, "default")
-        producerLogPathName = producerLogPath + "/producer_performance.log"
-
-        testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
-
-        commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
-        cmdList = ["ssh " + host,
-                   "'JAVA_HOME=" + javaHome,
-                   "JMX_PORT=" + jmxPort,
-                   kafkaRunClassBin + " kafka.perf.ProducerPerformance",
-                   "--broker-list " +brokerListStr,
-                   commandArgs + " &> " + producerLogPathName,
-                   " & echo pid:$! > " + producerLogPath + "/entity_" + entityId +
"_pid'"]
-
-        cmdStr = " ".join(cmdList)
-        logger.debug("executing command: [" + cmdStr + "]", extra=d)
-        system_test_utils.async_sys_call(cmdStr)
+        thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList,
producerConfig, brokerListStr))
         time.sleep(1)
         metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
-        pidCmdStr = "ssh " + host + " 'cat " + producerLogPath + "/entity_" + entityId +
"_pid'"
-        logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
-        subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, brokerListStr):
+    host              = producerConfig["hostname"]
+    entityId          = producerConfig["entity_id"]
+    jmxPort           = producerConfig["jmx_port"] 
+    role              = producerConfig["role"] 
+    kafkaHome         = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id",
entityId, "kafka_home")
+    javaHome          = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id",
entityId, "java_home")
+    jmxPort           = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id",
entityId, "jmx_port")
+    kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
+
+    logger.info("starting producer preformance", extra=d)
+
+    producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance",
entityId, "default")
+    producerLogPathName = producerLogPath + "/producer_performance.log"
+
+    testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
+    commandArgs = system_test_utils.convert_keyval_to_cmd_args(testcaseEnv.userDefinedEnvVarDict["producerConfigPathName"])
+
+    counter = 0
+    noMsgPerBatch    = int(testcaseEnv.testcaseArgumentsDict["num_messages_to_produce_per_producer_call"])
+    producerSleepSec = int(testcaseEnv.testcaseArgumentsDict["sleep_seconds_between_producer_calls"])
+
+    # keep calling producer until signaled by:
+    # testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]
+    while 1:
+        testcaseEnv.lock.acquire()
+        if not testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]:
+            initMsgId = counter * noMsgPerBatch
+
+            logger.info("#### [producer thread] status of stopBackgroundProducer : [False]
=> producing [" + str(noMsgPerBatch) + \
+                        "] messages with starting message id : [" + str(initMsgId) + "]",
extra=d)
+
+            cmdList = ["ssh " + host,
+                       "'JAVA_HOME=" + javaHome,
+                       "JMX_PORT=" + jmxPort,
+                       kafkaRunClassBin + " kafka.perf.ProducerPerformance",
+                       "--broker-list " + brokerListStr,
+                       "--initial-message-id " + str(initMsgId),
+                       "--messages " + str(noMsgPerBatch),
+                       commandArgs + " >> " + producerLogPathName,
+                       " & echo pid:$! > " + producerLogPath + "/entity_" + entityId
+ "_pid'"]
+
+            cmdStr = " ".join(cmdList)
+            logger.debug("executing command: [" + cmdStr + "]", extra=d)
+
+            subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+            for line in subproc.stdout.readlines():
+                pass    # dummy loop to wait until producer is completed
+        else:
+            testcaseEnv.lock.release()
+            break
 
-        # keep track of the remote entity pid in a dictionary
-        for line in subproc.stdout.readlines():
-            if line.startswith("pid"):
-                line = line.rstrip('\n')
-                logger.debug("found pid line: [" + line + "]", extra=d)
-                tokens = line.split(':')
-                testcaseEnv.producerHostParentPidDict[host] = tokens[1]
+        counter += 1
+        testcaseEnv.lock.release()
+        time.sleep(int(producerSleepSec))
+
+    # let the main testcase know producer has stopped
+    testcaseEnv.lock.acquire()
+    testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True
+    time.sleep(1)
+    testcaseEnv.lock.release()
+    time.sleep(1)
 
 
 def stop_remote_entity(systemTestEnv, entityId, parentPid):
@@ -650,7 +674,7 @@ def stop_remote_entity(systemTestEnv, en
     hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"entity_id", entityId, "hostname")
     pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 
-    logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
+    logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
     system_test_utils.sigterm_remote_process(hostname, pidStack)
 #    time.sleep(1)
 #    system_test_utils.sigkill_remote_process(hostname, pidStack)
@@ -662,7 +686,7 @@ def force_stop_remote_entity(systemTestE
     hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"entity_id", entityId, "hostname")
     pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 
-    logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
+    logger.debug("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
     system_test_utils.sigkill_remote_process(hostname, pidStack)
 
 
@@ -737,7 +761,9 @@ def validate_data_matched(systemTestEnv,
     outfile.close()
 
     logger.info("no. of unique messages sent from publisher  : " + str(len(producerMsgIdSet)),
extra=d)
-    logger.info("no. of unique messages received by consumer : " + str(len(producerMsgIdSet)),
extra=d)
+    logger.info("no. of unique messages received by consumer : " + str(len(consumerMsgIdSet)),
extra=d)
+    validationStatusDict["Unique messages from producer"] = str(len(producerMsgIdSet))
+    validationStatusDict["Unique messages from consumer"] = str(len(consumerMsgIdSet))
 
     if ( len(missingMsgIdInConsumer) == 0 and len(producerMsgIdSet) > 0 ):
         validationStatusDict["Validate for data matched"] = "PASSED"
@@ -892,15 +918,15 @@ def get_reelection_latency(systemTestEnv
     # get broker shut down completed timestamp
     shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv)
     #print shutdownBrokerDict
-    logger.info("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])),
extra=d)
+    logger.debug("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])),
extra=d)
 
-    logger.info("looking up new leader", extra=d)
+    logger.debug("looking up new leader", extra=d)
     leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv)
     #print leaderDict2
-    logger.info("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])),
extra=d)
+    logger.debug("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])),
extra=d)
     leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"])
     logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
-    testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency
* 1000)) + " ms"
+    #testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency
* 1000)) + " ms"
  
     return leaderReElectionLatency
 

Modified: incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/testcase_env.py?rev=1384854&r1=1384853&r2=1384854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/testcase_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/testcase_env.py Fri Sep 14 17:20:04 2012
@@ -23,6 +23,7 @@
 import json
 import os
 import sys
+import thread
 
 class TestcaseEnv():
 
@@ -83,11 +84,11 @@ class TestcaseEnv():
         self.testCaseBaseDir       = ""
         self.testCaseLogsDir       = ""
         self.testCaseDashboardsDir = ""
+
         # ================================
         # dictionary to keep track of
         # user-defined environment variables
         # ================================
-
         # LEADER_ELECTION_COMPLETED_MSG = "completed the leader state transition"
         # REGX_LEADER_ELECTION_PATTERN  = "\[(.*?)\] .* Broker (.*?) " + \
         #                            LEADER_ELECTION_COMPLETED_MSG + \
@@ -97,6 +98,9 @@ class TestcaseEnv():
         # consumerConfigPathName = ""
         # producerLogPathName    = ""
         # producerConfigPathName = ""
-
         self.userDefinedEnvVarDict = {}
 
+        # Lock object for producer threads synchronization
+        self.lock = thread.allocate_lock()
+
+



Mime
View raw message