kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-924 (follow-up); Specify console consumer properties via a single --property command line parameter; ; patched by Sriharsha Chintalapani; reviewed by Jun Rao
Date Tue, 03 Jun 2014 14:35:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 02311c064 -> 3a048e80d


kafka-924 (follow-up); Specify console consumer properties via a single --property command
line parameter;;  patched by Sriharsha Chintalapani; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a048e80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a048e80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a048e80

Branch: refs/heads/trunk
Commit: 3a048e80d526d5eaa39ec1588c6ee47af975d015
Parents: 02311c0
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Tue Jun 3 07:36:00 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jun 3 07:36:00 2014 -0700

----------------------------------------------------------------------
 system_test/utils/kafka_system_test_utils.py | 34 ++++++++++++++++++-----
 1 file changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a048e80/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index de02e47..8cde3c4 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -792,14 +792,20 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
         except:
             pass
 
-        # 4. group
-        groupOption = ""
-        try:
+	# 4. consumer config
+	consumerProperties = {}
+	consumerProperties["consumer.timeout.ms"] = timeoutMs
+	try:
             groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList,
"entity_id", entityId, "group.id")
-            groupOption = "--group " + groupOption
+            consumerProperties["group.id"] = groupOption
         except:
             pass
 
+	props_file_path=write_consumer_properties(consumerProperties)
+	scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/"
+	logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+	system_test_utils.sys_call(scpCmdStr)
+
         if len(formatterOption) > 0:
             formatterOption = " --formatter " + formatterOption + " "
 
@@ -818,9 +824,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                    kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
-                   "--consumer-timeout-ms " + timeoutMs,
+                   "--consumer.config /tmp/consumer.properties",
                    "--csv-reporter-enabled",
-                   groupOption,
                    formatterOption,
                    "--from-beginning",
                    " >> " + logPathName + "/" + logFile + " & echo pid:$! >
",
@@ -925,13 +930,20 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
             logger.error("Invalid cluster name : " + clusterName, extra=d)
             sys.exit(1)
 
+	consumerProperties = {}
+	consumerProperties["consumer.timeout.ms"] = timeoutMs
+	props_file_path=write_consumer_properties(consumerProperties)
+	scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/"
+	logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+	system_test_utils.sys_call(scpCmdStr)
+
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
-                   "--consumer-timeout-ms " + timeoutMs,
+                   "--consumer.config /tmp/consumer.properties",
                    "--csv-reporter-enabled",
                    #"--metrics-dir " + metricsDir,
                    formatterOption,
@@ -2484,4 +2496,12 @@ def get_leader_attributes(systemTestEnv, testcaseEnv):
     return leaderDict
 
 
+def write_consumer_properties(consumerProperties):
+    import tempfile
+    props_file_path = tempfile.gettempdir() + "/consumer.properties"
+    consumer_props_file=open(props_file_path,"w")
+    for key,value in consumerProperties.iteritems():
+        consumer_props_file.write(key+"="+value+"\n")
+    consumer_props_file.close()
+    return props_file_path
 


Mime
View raw message