kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/3] kafka-1240; Add ability to existing system tests to use the new producer client; patched by Jun Rao; reviewed by Neha Narkhede
Date Fri, 07 Mar 2014 03:07:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 74c54c7ee -> c765d7bd4


http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10110/testcase_0110_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10110/testcase_0110_properties.json b/system_test/replication_testsuite/testcase_10110/testcase_0110_properties.json
new file mode 100644
index 0000000..f51abc1
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10110/testcase_0110_properties.json
@@ -0,0 +1,86 @@
+{
+  "description": {"01":"Leader Failure in Replication : 1. mode => async; 2. acks => 1; 3. comp =>; 4. no. of partitins => 3; 5. log segment size => 1M",
+                  "02":"Produce and consume messages to a single topic - 3 partition.",
+                  "03":"This test sends messages to 3 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:async, acks:1, comp:1",
+                  "10":"Log segment size    : 1048576 (1M)"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "bounce_broker": "true",
+    "replica_factor": "3",
+    "num_partition": "3",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "1048576",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "1048576",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "1048576",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "sync":"false",
+      "log_filename": "producer_performance.log",
+      "config_filename": "producer_performance.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "zookeeper": "localhost:2188",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer.properties"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10131/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10131/cluster_config.json b/system_test/replication_testsuite/testcase_10131/cluster_config.json
new file mode 100644
index 0000000..cf147eb
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10131/cluster_config.json
@@ -0,0 +1,76 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9990"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9991"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9992"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9993"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9997"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9998"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9999"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9099"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10131/testcase_0131_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10131/testcase_0131_properties.json b/system_test/replication_testsuite/testcase_10131/testcase_0131_properties.json
new file mode 100644
index 0000000..a140882
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10131/testcase_0131_properties.json
@@ -0,0 +1,110 @@
+{
+  "description": {"01":"Leader Failure in Replication with multi topics & partitions : Base Test",
+                  "02":"Produce and consume messages to 2 topics - 3 partitions",
+                  "03":"This test sends messages to 2 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:sync, acks:-1, comp:0",
+                  "10":"Log segment size    : 102400"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "auto_create_topic": "true",
+    "bounce_broker": "true",
+    "replica_factor": "2",
+    "num_partition": "3",
+    "num_iteration": "3",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "102400",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log.index.interval.bytes": "10",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "102400",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log.index.interval.bytes": "10",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "102400",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log.index.interval.bytes": "10",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "producer-retry-backoff-ms": "300",
+      "sync":"true",
+      "log_filename": "producer_performance_4.log",
+      "config_filename": "producer_performance_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "producer-retry-backoff-ms": "300",
+      "sync":"true",
+      "log_filename": "producer_performance_5.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_6.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "test_2",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_7.log",
+      "config_filename": "console_consumer_7.properties"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10132/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10132/cluster_config.json b/system_test/replication_testsuite/testcase_10132/cluster_config.json
new file mode 100644
index 0000000..cf147eb
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10132/cluster_config.json
@@ -0,0 +1,76 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9990"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9991"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9992"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9993"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9997"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9998"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9999"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9099"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10132/testcase_0132_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10132/testcase_0132_properties.json b/system_test/replication_testsuite/testcase_10132/testcase_0132_properties.json
new file mode 100644
index 0000000..48b30c7
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10132/testcase_0132_properties.json
@@ -0,0 +1,107 @@
+{
+  "description": {"01":"Leader Failure in Replication with multi topics & partitions : 1. acks => 1",
+                  "02":"Produce and consume messages to 2 topics - 3 partitions",
+                  "03":"This test sends messages to 2 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:sync, acks:1, comp:0",
+                  "10":"Log segment size    : 512000"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "auto_create_topic": "true",
+    "bounce_broker": "true",
+    "replica_factor": "2",
+    "num_partition": "3",
+    "num_iteration": "3",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "512000",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "512000",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "512000",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "producer-retry-backoff-ms": "300",
+      "sync":"true",
+      "log_filename": "producer_performance_4.log",
+      "config_filename": "producer_performance_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "producer-retry-backoff-ms": "300",
+      "sync":"true",
+      "log_filename": "producer_performance_5.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_6.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "test_2",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_7.log",
+      "config_filename": "console_consumer_7.properties"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10133/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10133/cluster_config.json b/system_test/replication_testsuite/testcase_10133/cluster_config.json
new file mode 100644
index 0000000..cf147eb
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10133/cluster_config.json
@@ -0,0 +1,76 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9990"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9991"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9992"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9993"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9997"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9998"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9999"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9099"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10133/testcase_0133_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10133/testcase_0133_properties.json b/system_test/replication_testsuite/testcase_10133/testcase_0133_properties.json
new file mode 100644
index 0000000..8276aae
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10133/testcase_0133_properties.json
@@ -0,0 +1,107 @@
+{
+  "description": {"01":"Leader Failure in Replication with multi topics & partitions : 1. mode => async; 2. comp => 0",
+                  "02":"Produce and consume messages to 2 topics - 3 partitions",
+                  "03":"This test sends messages to 2 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:async, acks:1, comp:1",
+                  "10":"Log segment size    : 512000"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "auto_create_topic": "true",
+    "bounce_broker": "true",
+    "replica_factor": "2",
+    "num_partition": "3",
+    "num_iteration": "3",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "512000",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "512000",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "512000",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "default.replication.factor": "2",
+      "num.partitions": "3",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "producer-retry-backoff-ms": "300",
+      "sync":"false",
+      "log_filename": "producer_performance_4.log",
+      "config_filename": "producer_performance_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "producer-retry-backoff-ms": "300",
+      "sync":"false",
+      "log_filename": "producer_performance_5.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_6.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "test_2",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_7.log",
+      "config_filename": "console_consumer_7.properties"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_10134/testcase_0134_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_10134/testcase_0134_properties.json b/system_test/replication_testsuite/testcase_10134/testcase_0134_properties.json
new file mode 100644
index 0000000..73bb859
--- /dev/null
+++ b/system_test/replication_testsuite/testcase_10134/testcase_0134_properties.json
@@ -0,0 +1,92 @@
+{
+  "description": {"01":"Leader Failure in Replication with multi topics & partitions : 1. auto_create_topic => true",
+                  "02":"Produce and consume messages to 2 topics - 3 partitions",
+                  "03":"This test sends messages to 2 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:sync, acks:-1, comp:0",
+                  "10":"Log segment size    : 102400"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "bounce_broker": "true",
+    "replica_factor": "2",
+    "num_partition": "3",
+    "num_iteration": "3",
+    "auto_create_topic": "true",
+    "producer_multi_topics_mode": "true",
+    "consumer_multi_topics_mode": "true",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "num.partitions": "3",
+      "default.replication.factor": "2",
+      "log.segment.bytes": "102400",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log.index.interval.bytes": "10",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "broker.id": "2",
+      "num.partitions": "3",
+      "default.replication.factor": "2",
+      "log.segment.bytes": "102400",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log.index.interval.bytes": "10",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "broker.id": "3",
+      "num.partitions": "3",
+      "default.replication.factor": "2",
+      "log.segment.bytes": "102400",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log.index.interval.bytes": "10",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1,test_2",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "producer-retry-backoff-ms": "3500",
+      "producer-num-retries": "3",
+      "sync":"false",
+      "log_filename": "producer_performance_4.log",
+      "config_filename": "producer_performance_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_1,test_2",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_5.log",
+      "config_filename": "console_consumer_5.properties"
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4001/testcase_4001_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4001/testcase_4001_properties.json b/system_test/replication_testsuite/testcase_4001/testcase_4001_properties.json
index d2ffd95..2652f16 100644
--- a/system_test/replication_testsuite/testcase_4001/testcase_4001_properties.json
+++ b/system_test/replication_testsuite/testcase_4001/testcase_4001_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4002/testcase_4002_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4002/testcase_4002_properties.json b/system_test/replication_testsuite/testcase_4002/testcase_4002_properties.json
index c86525d..8724597 100644
--- a/system_test/replication_testsuite/testcase_4002/testcase_4002_properties.json
+++ b/system_test/replication_testsuite/testcase_4002/testcase_4002_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4003/testcase_4003_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4003/testcase_4003_properties.json b/system_test/replication_testsuite/testcase_4003/testcase_4003_properties.json
index b77e4fd..4e3b6f5 100644
--- a/system_test/replication_testsuite/testcase_4003/testcase_4003_properties.json
+++ b/system_test/replication_testsuite/testcase_4003/testcase_4003_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4004/testcase_4004_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4004/testcase_4004_properties.json b/system_test/replication_testsuite/testcase_4004/testcase_4004_properties.json
index e753327..f8718a6 100644
--- a/system_test/replication_testsuite/testcase_4004/testcase_4004_properties.json
+++ b/system_test/replication_testsuite/testcase_4004/testcase_4004_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4005/testcase_4005_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4005/testcase_4005_properties.json b/system_test/replication_testsuite/testcase_4005/testcase_4005_properties.json
index 5468401..af96c7b 100644
--- a/system_test/replication_testsuite/testcase_4005/testcase_4005_properties.json
+++ b/system_test/replication_testsuite/testcase_4005/testcase_4005_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4006/testcase_4006_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4006/testcase_4006_properties.json b/system_test/replication_testsuite/testcase_4006/testcase_4006_properties.json
index e5ab0a0..e132236 100644
--- a/system_test/replication_testsuite/testcase_4006/testcase_4006_properties.json
+++ b/system_test/replication_testsuite/testcase_4006/testcase_4006_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4007/testcase_4007_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4007/testcase_4007_properties.json b/system_test/replication_testsuite/testcase_4007/testcase_4007_properties.json
index 7aa6e9a..5c4e5bb 100644
--- a/system_test/replication_testsuite/testcase_4007/testcase_4007_properties.json
+++ b/system_test/replication_testsuite/testcase_4007/testcase_4007_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4008/testcase_4008_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4008/testcase_4008_properties.json b/system_test/replication_testsuite/testcase_4008/testcase_4008_properties.json
index 08aa108..8dce9b2 100644
--- a/system_test/replication_testsuite/testcase_4008/testcase_4008_properties.json
+++ b/system_test/replication_testsuite/testcase_4008/testcase_4008_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4011/testcase_4011_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4011/testcase_4011_properties.json b/system_test/replication_testsuite/testcase_4011/testcase_4011_properties.json
index 512fafb..c6f1d1c 100644
--- a/system_test/replication_testsuite/testcase_4011/testcase_4011_properties.json
+++ b/system_test/replication_testsuite/testcase_4011/testcase_4011_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4012/testcase_4012_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4012/testcase_4012_properties.json b/system_test/replication_testsuite/testcase_4012/testcase_4012_properties.json
index 9b711af..bc1ff63 100644
--- a/system_test/replication_testsuite/testcase_4012/testcase_4012_properties.json
+++ b/system_test/replication_testsuite/testcase_4012/testcase_4012_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4013/testcase_4013_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4013/testcase_4013_properties.json b/system_test/replication_testsuite/testcase_4013/testcase_4013_properties.json
index 3836366..aa48a68 100644
--- a/system_test/replication_testsuite/testcase_4013/testcase_4013_properties.json
+++ b/system_test/replication_testsuite/testcase_4013/testcase_4013_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4014/testcase_4014_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4014/testcase_4014_properties.json b/system_test/replication_testsuite/testcase_4014/testcase_4014_properties.json
index 86ab75a..7acf8b6 100644
--- a/system_test/replication_testsuite/testcase_4014/testcase_4014_properties.json
+++ b/system_test/replication_testsuite/testcase_4014/testcase_4014_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4015/testcase_4015_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4015/testcase_4015_properties.json b/system_test/replication_testsuite/testcase_4015/testcase_4015_properties.json
index 82d51b6..7841273 100644
--- a/system_test/replication_testsuite/testcase_4015/testcase_4015_properties.json
+++ b/system_test/replication_testsuite/testcase_4015/testcase_4015_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json b/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json
index 31c1be0..0519d27 100644
--- a/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json
+++ b/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json b/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json
index 72f78b0..c29077b 100644
--- a/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json
+++ b/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json b/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json
index ee459f4..ab57e5a 100644
--- a/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json
+++ b/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json
@@ -67,6 +67,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -80,6 +81,7 @@
     },
     {
       "entity_id": "5",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
index 958eef7..e959aed 100644
--- a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
+++ b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "t001",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/testcase_to_run_all.json
----------------------------------------------------------------------
diff --git a/system_test/testcase_to_run_all.json b/system_test/testcase_to_run_all.json
index 182160c..481f8e5 100644
--- a/system_test/testcase_to_run_all.json
+++ b/system_test/testcase_to_run_all.json
@@ -26,6 +26,17 @@
         "testcase_0109",
         "testcase_0110",
 
+        "testcase_10101",
+        "testcase_10102",
+        "testcase_10103",
+        "testcase_10104",
+        "testcase_10105",
+        "testcase_10106",
+        "testcase_10107",
+        "testcase_10108",
+        "testcase_10109",
+        "testcase_10110",
+
         "testcase_0111",
         "testcase_0112",
         "testcase_0113",
@@ -46,6 +57,12 @@
         "testcase_0131",
         "testcase_0132",
         "testcase_0133",
+        "testcase_0134",
+
+        "testcase_10131",
+        "testcase_10132",
+        "testcase_10133",
+        "testcase_10134",
 
         "testcase_0151",
         "testcase_0152",
@@ -118,6 +135,13 @@
         "testcase_5003",
         "testcase_5004",
         "testcase_5005",
-        "testcase_5006"
+        "testcase_5006",
+
+        "testcase_15001",
+        "testcase_15002",
+        "testcase_15003",
+        "testcase_15004",
+        "testcase_15005",
+        "testcase_15006"
     ]
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 5d2b7df..29ab2ba 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -696,6 +696,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
     configFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "config_filename")
     logFile    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
 
+    useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer")
     mmConsumerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
                            "mirror_consumer_config_filename")
     mmProducerConfigFile = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId,
@@ -729,15 +730,26 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
                   logPathName + "/entity_" + entityId + "_pid'"]
 
     elif role == "mirror_maker":
-        cmdList = ["ssh " + hostname,
-                  "'JAVA_HOME=" + javaHome,
-                 "JMX_PORT=" + jmxPort,
-                  kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker",
-                  "--consumer.config " + configPathName + "/" + mmConsumerConfigFile,
-                  "--producer.config " + configPathName + "/" + mmProducerConfigFile,
-                  "--whitelist=\".*\" >> ",
-                  logPathName + "/" + logFile + " & echo pid:$! > ",
-                  logPathName + "/entity_" + entityId + "_pid'"]
+        if useNewProducer.lower() == "true":
+            cmdList = ["ssh " + hostname,
+                      "'JAVA_HOME=" + javaHome,
+                      "JMX_PORT=" + jmxPort,
+                      kafkaHome + "/bin/kafka-run-class.sh kafka.tools.newproducer.MirrorMaker",
+                      "--consumer.config " + configPathName + "/" + mmConsumerConfigFile,
+                      "--producer.config " + configPathName + "/" + mmProducerConfigFile,
+                      "--whitelist=\".*\" >> ",
+                      logPathName + "/" + logFile + " & echo pid:$! > ",
+                      logPathName + "/entity_" + entityId + "_pid'"]
+        else:       
+            cmdList = ["ssh " + hostname,
+                      "'JAVA_HOME=" + javaHome,
+                      "JMX_PORT=" + jmxPort,
+                      kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker",
+                      "--consumer.config " + configPathName + "/" + mmConsumerConfigFile,
+                      "--producer.config " + configPathName + "/" + mmProducerConfigFile,
+                      "--whitelist=\".*\" >> ",
+                      logPathName + "/" + logFile + " & echo pid:$! > ",
+                      logPathName + "/entity_" + entityId + "_pid'"]
 
     cmdStr = " ".join(cmdList)
 
@@ -960,6 +972,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
     noMsgPerBatch  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message")
     requestNumAcks = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "request-num-acks")
     syncMode       = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "sync")
+    useNewProducer = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "new-producer")
     retryBackoffMs = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-retry-backoff-ms")
     numOfRetries   = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer-num-retries")
 
@@ -998,6 +1011,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
     boolArgumentsStr = ""
     if syncMode.lower() == "true":
         boolArgumentsStr = boolArgumentsStr + " --sync"
+    if useNewProducer.lower() == "true":
+        boolArgumentsStr = boolArgumentsStr + " --new-producer"
 
     # keep calling producer until signaled to stop by:
     # testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]
@@ -1506,6 +1521,7 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
             logger.info("status of backgroundProducerStopped : [" + \
                 str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d)
             if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+                testcaseEnv.lock.release()
                 logger.info("all producer threads completed", extra=d)
                 break
             testcaseEnv.lock.release()


Mime
View raw message