kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [1/5] KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Patil and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini
Date Fri, 14 Mar 2014 22:14:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 84a3a9a3d -> a670537aa


http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 29ab2ba..35f2d1b 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -539,6 +539,17 @@ def start_brokers(systemTestEnv, testcaseEnv):
     for brokerEntityId in brokerEntityIdList:
         start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
 
+def start_console_consumers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
+
+    if onlyThisEntityId is not None:
+        start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId)
+    else:
+        clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+        consoleConsumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(
+            clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
+        for entityId in consoleConsumerEntityIdList:
+            start_entity_in_background(systemTestEnv, testcaseEnv, entityId)
+
 
 def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
 
@@ -751,10 +762,72 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                       logPathName + "/" + logFile + " & echo pid:$! > ",
                       logPathName + "/entity_" + entityId + "_pid'"]
 
+    elif role == "console_consumer":
+        clusterToConsumeFrom = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList,
"entity_id", entityId, "cluster_name")
+        numTopicsForAutoGenString = -1
+        try:
+            numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"])
+        except:
+            pass
+
+        topic = ""
+        if numTopicsForAutoGenString < 0:
+            topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id",
entityId, "topic")
+        else:
+            topic = generate_topics_string("topic", numTopicsForAutoGenString)
+
+        # update this variable and will be used by data validation functions
+        testcaseEnv.consumerTopicsString = topic
+
+        # 2. consumer timeout
+        timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id",
entityId, "consumer-timeout-ms")
+
+        # 3. consumer formatter
+        formatterOption = ""
+        try:
+            formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList,
"entity_id", entityId, "formatter")
+        except:
+            pass
+
+        # 4. group
+        groupOption = ""
+        try:
+            groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList,
"entity_id", entityId, "group.id")
+            groupOption = "--group " + groupOption
+        except:
+            pass
+
+        if len(formatterOption) > 0:
+            formatterOption = " --formatter " + formatterOption + " "
+
+        # get zookeeper connect string
+        zkConnectStr = ""
+        if clusterName == "source":
+            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+        elif clusterName == "target":
+            zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
+        else:
+            logger.error("Invalid cluster name : " + clusterName, extra=d)
+            sys.exit(1)
+        cmdList = ["ssh " + hostname,
+                   "'JAVA_HOME=" + javaHome,
+                   "JMX_PORT=" + jmxPort,
+                   kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer",
+                   "--zookeeper " + zkConnectStr,
+                   "--topic " + topic,
+                   "--consumer-timeout-ms " + timeoutMs,
+                   "--csv-reporter-enabled",
+                   groupOption,
+                   formatterOption,
+                   "--from-beginning",
+                   " >> " + logPathName + "/" + logFile + " & echo pid:$! >
",
+                   logPathName + "/entity_" + entityId + "_pid'"]
+
     cmdStr = " ".join(cmdList)
 
     logger.debug("executing command: [" + cmdStr + "]", extra=d)
     system_test_utils.async_sys_call(cmdStr)
+    logger.info("sleeping for 5 seconds.", extra=d)
     time.sleep(5)
 
     pidCmdStr = "ssh " + hostname + " 'cat " + logPathName + "/entity_" + entityId + "_pid'
2> /dev/null"
@@ -773,6 +846,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                 testcaseEnv.entityBrokerParentPidDict[entityId] = tokens[1]
             elif role == "mirror_maker":
                 testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1]
+            elif role == "console_consumer":
+                testcaseEnv.entityConsoleConsumerParentPidDict[entityId] = tokens[1]
 
 
 def start_console_consumer(systemTestEnv, testcaseEnv):
@@ -1117,7 +1192,7 @@ def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM")
     hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"entity_id", entityId, "hostname")
     pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 
-    logger.debug("terminating (" + signalType + ") process id: " + parentPid + " in host:
" + hostname, extra=d)
+    logger.info("terminating (" + signalType + ") process id: " + parentPid + " in host:
" + hostname, extra=d)
 
     if signalType.lower() == "sigterm":
         system_test_utils.sigterm_remote_process(hostname, pidStack)
@@ -1138,7 +1213,7 @@ def force_stop_remote_entity(systemTestEnv, entityId, parentPid):
     system_test_utils.sigkill_remote_process(hostname, pidStack)
 
 
-def create_topic(systemTestEnv, testcaseEnv):
+def create_topic_for_producer_performance(systemTestEnv, testcaseEnv):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
     prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList,
"role", "producer_performance")
@@ -1184,6 +1259,40 @@ def create_topic(systemTestEnv, testcaseEnv):
             logger.debug("executing command: [" + cmdStr + "]", extra=d)
             subproc = system_test_utils.sys_call_return_subproc(cmdStr)
 
+def create_topic(systemTestEnv, testcaseEnv, topic, replication_factor, num_partitions):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    zkEntityId      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"role", "zookeeper", "entity_id")
+    kafkaHome       = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"entity_id", zkEntityId, "kafka_home")
+    javaHome        = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList,
"entity_id", zkEntityId, "java_home")
+    createTopicBin  = kafkaHome + "/bin/kafka-topics.sh --create"
+    zkConnectStr = ""
+    zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role",
"zookeeper", "hostname")
+    if len(testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]) > 0:
+        zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
+    elif len(testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]) > 0:
+        zkConnectStr = testcaseEnv.userDefinedEnvVarDict["targetZkConnectStr"]
+    else:
+        raise Exception("Empty zkConnectStr found")
+
+    testcaseBaseDir = testcaseEnv.testCaseBaseDir
+
+    testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome)
+
+    logger.debug("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d)
+    cmdList = ["ssh " + zkHost,
+               "'JAVA_HOME=" + javaHome,
+               createTopicBin,
+               " --topic "     + topic,
+               " --zookeeper " + zkConnectStr,
+               " --replication-factor "   + str(replication_factor),
+               " --partitions " + str(num_partitions) + " >> ",
+               testcaseBaseDir + "/logs/create_source_cluster_topic.log'"]
+
+    cmdStr = " ".join(cmdList)
+    logger.info("executing command: [" + cmdStr + "]", extra=d)
+    subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+
+
 
 def get_message_id(logPathName, topic=""):
     logLines      = open(logPathName, "r").readlines()
@@ -1221,7 +1330,7 @@ def get_message_checksum(logPathName):
 
 
 def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils):
-    logger.debug("#### Inside validate_data_matched", extra=d)
+    logger.info("#### Inside validate_data_matched", extra=d)
 
     validationStatusDict        = testcaseEnv.validationStatusDict
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -1245,11 +1354,11 @@ def validate_data_matched(systemTestEnv, testcaseEnv, replicationUtils):
             consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList,
"entity_id", consumerEntityId, "topic")
             if consumerTopic in topic:
                 matchingConsumerEntityId = consumerEntityId
-                logger.debug("matching consumer entity id found", extra=d)
+                logger.info("matching consumer entity id found", extra=d)
                 break
 
         if matchingConsumerEntityId is None:
-            logger.debug("matching consumer entity id NOT found", extra=d)
+            logger.info("matching consumer entity id NOT found", extra=d)
             break
 
         msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( \
@@ -1337,7 +1446,7 @@ def cleanup_data_at_remote_hosts(systemTestEnv, testcaseEnv):
     logger.info("cleaning up test case dir: [" + testCaseBaseDir + "]", extra=d)
 
     if "system_test" not in testCaseBaseDir:
-        logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
+        # logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
         logger.warn("check config file: system_test/cluster_config.properties", extra=d)
         logger.warn("aborting test...", extra=d)
         sys.exit(1)
@@ -1539,6 +1648,9 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
     for entityId, mirrorMakerParentPid in testcaseEnv.entityMirrorMakerParentPidDict.items():
         stop_remote_entity(systemTestEnv, entityId, mirrorMakerParentPid)
 
+    for entityId, consumerParentPid in testcaseEnv.entityConsoleConsumerParentPidDict.items():
+        stop_remote_entity(systemTestEnv, entityId, consumerParentPid)
+
     for entityId, brokerParentPid in testcaseEnv.entityBrokerParentPidDict.items():
         stop_remote_entity(systemTestEnv, entityId, brokerParentPid)
 
@@ -2119,7 +2231,6 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
     prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList,
"role", "producer_performance")
-    consumerCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList,
"role", "console_consumer")
 
     for prodPerfCfg in prodPerfCfgList:
         producerEntityId = prodPerfCfg["entity_id"]
@@ -2147,7 +2258,7 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
         topicList = topicStr.split(',')
         for topic in topicList:
             consumerDuplicateCount = 0
-            msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( 
+            msgIdMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname(
                                                 testcaseEnv, "console_consumer", matchingConsumerEntityId,
"default") \
                                                 + "/msg_id_missing_in_consumer_" + topic
+ ".log"
             producerMsgIdList  = get_message_id(producerLogPathName, topic)
@@ -2163,8 +2274,11 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
                 outfile.write(id + "\n")
             outfile.close()
 
+            logger.info("Producer entity id " + producerEntityId, extra=d)
+            logger.info("Consumer entity id " + matchingConsumerEntityId, extra=d)
             logger.info("no. of unique messages on topic [" + topic + "] sent from publisher
 : " + str(len(producerMsgIdSet)), extra=d)
             logger.info("no. of unique messages on topic [" + topic + "] received by consumer
: " + str(len(consumerMsgIdSet)), extra=d)
+            logger.info("no. of duplicate messages on topic [" + topic + "] received by consumer:
" + str(consumerDuplicateCount), extra=d)
             validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgIdSet))
             validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgIdSet))
 
@@ -2263,6 +2377,43 @@ def validate_index_log(systemTestEnv, testcaseEnv, clusterName="source"):
     else:
         validationStatusDict["Validate index log in cluster [" + clusterName + "]"] = "FAILED"
 
+def get_leader_for(systemTestEnv, testcaseEnv, topic, partition):
+    logger.info("Querying Zookeeper for leader info for topic " + topic, extra=d)
+    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
+    tcConfigsList      = testcaseEnv.testcaseConfigsList
+
+    zkDictList         = system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList,
"role", "zookeeper")
+    firstZkDict        = zkDictList[0]
+    hostname           = firstZkDict["hostname"]
+    zkEntityId         = firstZkDict["entity_id"]
+    clientPort         = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id",
zkEntityId, "clientPort")
+    kafkaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList,
"entity_id", zkEntityId, "kafka_home")
+    javaHome           = system_test_utils.get_data_by_lookup_keyval(clusterConfigsList,
"entity_id", zkEntityId, "java_home")
+    kafkaRunClassBin   = kafkaHome + "/bin/kafka-run-class.sh"
+
+    zkQueryStr = "get /brokers/topics/" + topic + "/partitions/" + str(partition) + "/state"
+    brokerid   = ''
+    leaderEntityId = ''
+
+    cmdStrList = ["ssh " + hostname,
+                  "\"JAVA_HOME=" + javaHome,
+                  kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
+                  "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+                  zkQueryStr + " 2> /dev/null | tail -1\""]
+    cmdStr = " ".join(cmdStrList)
+    logger.info("executing command [" + cmdStr + "]", extra=d)
+    subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+    for line in subproc.stdout.readlines():
+        if "\"leader\"" in line:
+            line = line.rstrip('\n')
+            json_data = json.loads(line)
+            for key,val in json_data.items():
+                if key == 'leader':
+                    brokerid = str(val)
+            leaderEntityId = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "broker.id",
brokerid, "entity_id")
+            break
+    return leaderEntityId
+
 def get_leader_attributes(systemTestEnv, testcaseEnv):
 
     logger.info("Querying Zookeeper for leader info ...", extra=d)
@@ -2294,11 +2445,10 @@ def get_leader_attributes(systemTestEnv, testcaseEnv):
                   "-server " + testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
                   zkQueryStr + " 2> /dev/null | tail -1\""]
     cmdStr = " ".join(cmdStrList)
-    logger.debug("executing command [" + cmdStr + "]", extra=d)
+    logger.info("executing command [" + cmdStr + "]", extra=d)
 
     subproc = system_test_utils.sys_call_return_subproc(cmdStr)
     for line in subproc.stdout.readlines():
-        logger.debug("zk returned : " + line, extra=d)
         if "\"leader\"" in line:
             line = line.rstrip('\n')
             json_data = json.loads(line)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/utils/testcase_env.py
----------------------------------------------------------------------
diff --git a/system_test/utils/testcase_env.py b/system_test/utils/testcase_env.py
index bee8716..b3c2910 100644
--- a/system_test/utils/testcase_env.py
+++ b/system_test/utils/testcase_env.py
@@ -51,6 +51,12 @@ class TestcaseEnv():
     # { 0: 12345, 1: 12389, ... }
     entityMirrorMakerParentPidDict = {}
 
+    # dictionary of entity_id to ppid for console-consumer entities
+    # key: entity_id
+    # val: ppid of console consumer associated to that entity_id
+    # { 0: 12345, 1: 12389, ... }
+    entityConsoleConsumerParentPidDict = {}
+
     # dictionary of entity_id to ppid for migration tool entities
     # key: entity_id
     # val: ppid of broker associated to that entity_id


Mime
View raw message