kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1407680 [4/4] - in /incubator/kafka/branches/0.8/system_test: ./ migration_tool_testsuite/ migration_tool_testsuite/testcase_9001/ migration_tool_testsuite/testcase_9003/ migration_tool_testsuite/testcase_9004/ migration_tool_testsuite/tes...
Date Fri, 09 Nov 2012 22:58:28 GMT
Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/cluster_config.json?rev=1407680&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/cluster_config.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/cluster_config.json Fri Nov  9 22:58:23 2012
@@ -0,0 +1,76 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9990"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9991"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9992"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9993"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9997"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9998"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9999"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9191"
+        }
+    ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json?rev=1407680&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4016/testcase_4016_properties.json Fri Nov  9 22:58:23 2012
@@ -0,0 +1,103 @@
+{
+  "description": {"01":"Broker Log Retention (Replica Factor < No. of Brokers) : 1. acks => 1",
+                  "02":"Produce and consume messages to 2 topics - 2 partitions",
+                  "03":"This test sends messages to 2 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:async, acks:1, comp:0",
+                  "10":"Log segment size    : 102400"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "bounce_broker": "true",
+    "replica_factor": "2",
+    "num_partition": "2",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "broker_down_time_in_sec": "5",
+    "message_producing_free_time_sec": "15",
+    "log_retention_test": "true"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "1",
+      "sync":"false",
+      "log_filename": "producer_performance_4.log",
+      "config_filename": "producer_performance_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "1",
+      "sync":"false",
+      "log_filename": "producer_performance_5.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_1",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer_6.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "test_2",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer_7.log",
+      "config_filename": "console_consumer_7.properties"
+    }
+  ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/cluster_config.json?rev=1407680&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/cluster_config.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/cluster_config.json Fri Nov  9 22:58:23 2012
@@ -0,0 +1,76 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9990"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9991"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9992"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9993"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9997"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9998"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9999"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9191"
+        }
+    ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json?rev=1407680&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4017/testcase_4017_properties.json Fri Nov  9 22:58:23 2012
@@ -0,0 +1,103 @@
+{
+  "description": {"01":"Broker Log Retention (Replica Factor < No. of Brokers) : 1. acks => -1, 2. comp => 1",
+                  "02":"Produce and consume messages to 2 topics - 2 partitions",
+                  "03":"This test sends messages to 2 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:async, acks:-1, comp:1",
+                  "10":"Log segment size    : 102400"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "bounce_broker": "true",
+    "replica_factor": "2",
+    "num_partition": "2",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "broker_down_time_in_sec": "5",
+    "message_producing_free_time_sec": "15",
+    "log_retention_test": "true"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "-1",
+      "sync":"false",
+      "log_filename": "producer_performance_4.log",
+      "config_filename": "producer_performance_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "-1",
+      "sync":"false",
+      "log_filename": "producer_performance_5.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_1",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer_6.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "test_2",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer_7.log",
+      "config_filename": "console_consumer_7.properties"
+    }
+  ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/cluster_config.json?rev=1407680&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/cluster_config.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/cluster_config.json Fri Nov  9 22:58:23 2012
@@ -0,0 +1,76 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9990"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9991"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9992"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9993"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9997"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9998"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9999"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name": "source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9191"
+        }
+    ]
+}

Added: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json?rev=1407680&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json (added)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_4018/testcase_4018_properties.json Fri Nov  9 22:58:23 2012
@@ -0,0 +1,103 @@
+{
+  "description": {"01":"Broker Log Retention (Replica Factor < No. of Brokers) : 1. acks => 1",
+                  "02":"Produce and consume messages to 2 topics - 2 partitions",
+                  "03":"This test sends messages to 2 replicas",
+                  "04":"To trigger leader election: find the leader and terminate by controlled failure (kill -15)",
+                  "05":"Restart the terminated broker",
+                  "06":"Lookup brokers' log4j messages and verify that leader is re-elected successfully",
+                  "07":"At the end it verifies the log size and contents",
+                  "08":"Use a consumer to verify no message loss.",
+                  "09":"Producer dimensions : mode:async, acks:1, comp:1",
+                  "10":"Log segment size    : 102400"
+  },
+  "testcase_args": {
+    "broker_type": "leader",
+    "bounce_broker": "true",
+    "replica_factor": "2",
+    "num_partition": "2",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "broker_down_time_in_sec": "5",
+    "message_producing_free_time_sec": "15",
+    "log_retention_test": "true"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2188",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_2188.log",
+      "config_filename": "zookeeper_2188.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "brokerid": "1",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "log_filename": "kafka_server_9091.log",
+      "config_filename": "kafka_server_9091.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "brokerid": "2",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "log_filename": "kafka_server_9092.log",
+      "config_filename": "kafka_server_9092.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "brokerid": "3",
+      "log.file.size": "102400",
+      "log.retention.size": "1048576",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "log_filename": "kafka_server_9093.log",
+      "config_filename": "kafka_server_9093.properties"
+    },
+    {
+      "entity_id": "4",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "1",
+      "sync":"false",
+      "log_filename": "producer_performance_4.log",
+      "config_filename": "producer_performance_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "1",
+      "sync":"false",
+      "log_filename": "producer_performance_5.log",
+      "config_filename": "producer_performance_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_1",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer_6.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "test_2",
+      "groupid": "mytestgroup",
+      "consumer-timeout-ms": "60000",
+      "log_filename": "console_consumer_7.log",
+      "config_filename": "console_consumer_7.properties"
+    }
+  ]
+}

Modified: incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json?rev=1407680&r1=1407679&r2=1407680&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json (original)
+++ incubator/kafka/branches/0.8/system_test/testcase_to_run_all.json Fri Nov  9 22:58:23 2012
@@ -38,6 +38,14 @@
         "testcase_0121",
         "testcase_0122",
         "testcase_0123",
+        "testcase_0124",
+        "testcase_0125",
+        "testcase_0126",
+        "testcase_0127",
+
+        "testcase_0131",
+        "testcase_0132",
+        "testcase_0133",
 
         "testcase_0151",
         "testcase_0152",
@@ -73,15 +81,41 @@
         "testcase_0305",
         "testcase_0306",
         "testcase_0307",
-        "testcase_0308"
+        "testcase_0308",
+
+        "testcase_4001",
+        "testcase_4002",
+        "testcase_4003",
+        "testcase_4004",
+        "testcase_4005",
+        "testcase_4006",
+        "testcase_4007",
+        "testcase_4008",
+
+        "testcase_4011",
+        "testcase_4012",
+        "testcase_4013",
+        "testcase_4014",
+        "testcase_4015",
+        "testcase_4016",
+        "testcase_4017",
+        "testcase_4018"
     ],
 
     "MigrationToolTest"  : [
-        "testcase_9001"
+        "testcase_9001",
+        "testcase_9003",
+        "testcase_9004",
+        "testcase_9005",
+        "testcase_9006"
     ],
 
     "MirrorMakerTest"  : [
         "testcase_5001",
-        "testcase_5002"
+        "testcase_5002",
+        "testcase_5003",
+        "testcase_5004",
+        "testcase_5005",
+        "testcase_5006"
     ]
 }

Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1407680&r1=1407679&r2=1407680&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Fri Nov  9 22:58:23 2012
@@ -174,6 +174,20 @@ def collect_logs_from_remote_hosts(syste
             logger.debug("executing command [" + cmdStr + "]", extra=d)
             system_test_utils.sys_call(cmdStr)
 
+        # ==============================
+        # collect ZK log
+        # ==============================
+        if role == "zookeeper":
+            dataLogPathName = system_test_utils.get_data_by_lookup_keyval(
+                                  testcaseEnv.testcaseConfigsList, "entity_id", entity_id, "dataDir")
+
+            cmdList = ["scp -r",
+                       hostname + ":" + dataLogPathName,
+                       logPathName]
+            cmdStr  = " ".join(cmdList)
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            system_test_utils.sys_call(cmdStr)
+
     # ==============================
     # collect dashboards file
     # ==============================
@@ -485,14 +499,17 @@ def start_brokers(systemTestEnv, testcas
         start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
 
 
-def start_mirror_makers(systemTestEnv, testcaseEnv):
-    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
 
-    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( 
-        clusterEntityConfigDictList, "role", "mirror_maker", "entity_id")
+    if onlyThisEntityId is not None:
+        start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId)
+    else:
+        clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+        brokerEntityIdList          = system_test_utils.get_data_from_list_of_dicts( 
+                                      clusterEntityConfigDictList, "role", "mirror_maker", "entity_id")
 
-    for brokerEntityId in brokerEntityIdList:
-        start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
+        for brokerEntityId in brokerEntityIdList:
+            start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
 
 
 def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict):
@@ -1040,9 +1057,10 @@ def get_message_checksum(logPathName):
         if not "checksum:" in line:
             continue
         else:
-            matchObj = re.match('.*checksum:(\d*?).*', line)
+            matchObj = re.match('.*checksum:(\d*).*', line)
             if matchObj is not None:
-                messageChecksumList.append( matchObj.group(1) )
+                checksum = matchObj.group(1)
+                messageChecksumList.append( checksum )
             else:
                 logger.error("unexpected log line : " + line, extra=d)
 
@@ -1311,52 +1329,70 @@ def stop_all_remote_running_processes(sy
         stop_remote_entity(systemTestEnv, entityId, zkParentPid)
 
 
-def start_migration_tool(systemTestEnv, testcaseEnv):
+def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None):
     clusterConfigList = systemTestEnv.clusterEntityConfigDictList
     migrationToolConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "migration_tool")
 
-    migrationToolConfig = migrationToolConfigList[0]
-    host              = migrationToolConfig["hostname"]
-    entityId          = migrationToolConfig["entity_id"]
-    jmxPort           = migrationToolConfig["jmx_port"] 
-    role              = migrationToolConfig["role"] 
-    kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home")
-    javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home")
-    jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port")
-    kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
+    for migrationToolConfig in migrationToolConfigList:
 
-    logger.info("starting kafka migration tool", extra=d)
-    migrationToolLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "migration_tool", entityId, "default")
-    migrationToolLogPathName = migrationToolLogPath + "/migration_tool.log"
-    testcaseEnv.userDefinedEnvVarDict["migrationToolLogPathName"] = migrationToolLogPathName
+        entityId = migrationToolConfig["entity_id"]
 
-    testcaseConfigsList = testcaseEnv.testcaseConfigsList
-    numProducers    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.producers")
-    numStreams      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.streams")
-    producerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer.config")
-    consumerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "consumer.config")
-    zkClientJar     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "zkclient.01.jar")
-    kafka07Jar      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "kafka.07.jar")
-    whiteList       = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "whitelist")
-    logFile         = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
-
-    cmdList = ["ssh " + host,
-               "'JAVA_HOME=" + javaHome,
-               "JMX_PORT=" + jmxPort,
-               kafkaRunClassBin + " kafka.tools.KafkaMigrationTool",
-               "--whitelist="        + whiteList,
-               "--num.producers="    + numProducers,
-               "--num.streams="      + numStreams,
-               "--producer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + producerConfig,
-               "--consumer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + consumerConfig,
-               "--zkclient.01.jar="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + zkClientJar,
-               "--kafka.07.jar="     + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + kafka07Jar,
-               " &> " + migrationToolLogPath + "/migrationTool.log",
-               " & echo pid:$! > " + migrationToolLogPath + "/entity_" + entityId + "_pid'"]
+        if onlyThisEntityId is None or entityId == onlyThisEntityId:
+
+            host              = migrationToolConfig["hostname"]
+            jmxPort           = migrationToolConfig["jmx_port"] 
+            role              = migrationToolConfig["role"] 
+            kafkaHome         = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home")
+            javaHome          = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home")
+            jmxPort           = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port")
+            kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
+
+            logger.info("starting kafka migration tool", extra=d)
+            migrationToolLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "migration_tool", entityId, "default")
+            migrationToolLogPathName = migrationToolLogPath + "/migration_tool.log"
+            testcaseEnv.userDefinedEnvVarDict["migrationToolLogPathName"] = migrationToolLogPathName
+
+            testcaseConfigsList = testcaseEnv.testcaseConfigsList
+            numProducers    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.producers")
+            numStreams      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "num.streams")
+            producerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "producer.config")
+            consumerConfig  = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "consumer.config")
+            zkClientJar     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "zkclient.01.jar")
+            kafka07Jar      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "kafka.07.jar")
+            whiteList       = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "whitelist")
+            logFile         = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "log_filename")
+
+            cmdList = ["ssh " + host,
+                       "'JAVA_HOME=" + javaHome,
+                       "JMX_PORT=" + jmxPort,
+                       kafkaRunClassBin + " kafka.tools.KafkaMigrationTool",
+                       "--whitelist="        + whiteList,
+                       "--num.producers="    + numProducers,
+                       "--num.streams="      + numStreams,
+                       "--producer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + producerConfig,
+                       "--consumer.config="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + consumerConfig,
+                       "--zkclient.01.jar="  + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + zkClientJar,
+                       "--kafka.07.jar="     + systemTestEnv.SYSTEM_TEST_BASE_DIR + "/" + kafka07Jar,
+                       " &> " + migrationToolLogPath + "/migrationTool.log",
+                       " & echo pid:$! > " + migrationToolLogPath + "/entity_" + entityId + "_pid'"]
+
+            cmdStr = " ".join(cmdList)
+            logger.debug("executing command: [" + cmdStr + "]", extra=d)
+            system_test_utils.async_sys_call(cmdStr)
+            time.sleep(5)
+
+            pidCmdStr = "ssh " + host + " 'cat " + migrationToolLogPath + "/entity_" + entityId + "_pid' 2> /dev/null"
+            logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+            subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+            # keep track of the remote entity pid in a dictionary
+            for line in subproc.stdout.readlines():
+                if line.startswith("pid"):
+                    line = line.rstrip('\n')
+                    logger.debug("found pid line: [" + line + "]", extra=d)
+                    tokens = line.split(':')
+                    testcaseEnv.entityMigrationToolParentPidDict[entityId] = tokens[1]
 
-    cmdStr = " ".join(cmdList)
-    logger.debug("executing command: [" + cmdStr + "]", extra=d)
-    system_test_utils.async_sys_call(cmdStr)
 
 def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv):
     validationStatusDict        = testcaseEnv.validationStatusDict
@@ -1392,30 +1428,40 @@ def validate_07_08_migrated_data_matched
         consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", matchingConsumerEntityId, "default")
         consumerLogPathName = consumerLogPath + "/console_consumer.log"
 
-        producerMsgChecksumList   = get_message_checksum(producerLogPathName)
-        consumerMsgChecksumList   = get_message_checksum(consumerLogPathName)
-        producerMsgChecksumSet    = set(producerMsgChecksumList)
-        consumerMsgChecksumSet    = set(consumerMsgChecksumList)
+        producerMsgChecksumList      = get_message_checksum(producerLogPathName)
+        consumerMsgChecksumList      = get_message_checksum(consumerLogPathName)
+        producerMsgChecksumSet       = set(producerMsgChecksumList)
+        consumerMsgChecksumSet       = set(consumerMsgChecksumList)
+        producerMsgChecksumUniqList  = list(producerMsgChecksumSet)
+        consumerMsgChecksumUniqList  = list(consumerMsgChecksumSet)
 
         missingMsgChecksumInConsumer = producerMsgChecksumSet - consumerMsgChecksumSet
 
+        logger.debug("size of producerMsgChecksumList      : " + str(len(producerMsgChecksumList)), extra=d)
+        logger.debug("size of consumerMsgChecksumList      : " + str(len(consumerMsgChecksumList)), extra=d)
+        logger.debug("size of producerMsgChecksumSet       : " + str(len(producerMsgChecksumSet)), extra=d)
+        logger.debug("size of consumerMsgChecksumSet       : " + str(len(consumerMsgChecksumSet)), extra=d)
+        logger.debug("size of producerMsgChecksumUniqList  : " + str(len(producerMsgChecksumUniqList)), extra=d)
+        logger.debug("size of consumerMsgChecksumUniqList  : " + str(len(consumerMsgChecksumUniqList)), extra=d)
+        logger.debug("size of missingMsgChecksumInConsumer : " + str(len(missingMsgChecksumInConsumer)), extra=d)
+
         outfile = open(msgChecksumMissingInConsumerLogPathName, "w")
         for id in missingMsgChecksumInConsumer:
             outfile.write(id + "\n")
         outfile.close()
 
-        logger.info("no. of unique messages on topic [" + topic + "] sent from publisher  : " + str(len(producerMsgChecksumList)), extra=d)
-        logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgChecksumList)), extra=d)
-        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(producerMsgChecksumList))
-        validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(consumerMsgChecksumList))
+        logger.info("no. of messages on topic [" + topic + "] sent from producer          : " + str(len(producerMsgChecksumList)), extra=d)
+        logger.info("no. of messages on topic [" + topic + "] received by consumer        : " + str(len(consumerMsgChecksumList)), extra=d)
+        logger.info("no. of unique messages on topic [" + topic + "] sent from producer   : " + str(len(producerMsgChecksumUniqList)),  extra=d)
+        logger.info("no. of unique messages on topic [" + topic + "] received by consumer : " + str(len(consumerMsgChecksumUniqList)),  extra=d)
+        validationStatusDict["Unique messages from producer on [" + topic + "]"] = str(len(list(producerMsgChecksumSet)))
+        validationStatusDict["Unique messages from consumer on [" + topic + "]"] = str(len(list(consumerMsgChecksumSet)))
 
-        if ( len(missingMsgChecksumInConsumer) == 0 and len(producerMsgChecksumList) > 0 ):
+        if ( len(producerMsgChecksumList) > 0 and len(list(producerMsgChecksumSet)) == len(list(consumerMsgChecksumSet))):
             validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "PASSED"
-            #return True
         else:
             validationStatusDict["Validate for data matched on topic [" + topic + "]"] = "FAILED"
             logger.info("See " + msgChecksumMissingInConsumerLogPathName + " for missing MessageID", extra=d)
-            #return False
 
 def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName="source"):
 
@@ -1542,7 +1588,7 @@ def validate_broker_log_segment_checksum
     else:
         validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "FAILED"
 
-def start_simple_consumer(systemTestEnv, testcaseEnv):
+def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None):
 
     clusterList        = systemTestEnv.clusterEntityConfigDictList
     consumerConfigList = system_test_utils.get_dict_from_list_of_dicts(clusterList, "role", "console_consumer")
@@ -1586,45 +1632,42 @@ def start_simple_consumer(systemTestEnv,
             numPartitions = int(numPartitions)
 
         replicaIndex   = 1
+        startingOffset = -2
         brokerPortList = brokerListStr.split(',')
         for brokerPort in brokerPortList:
 
-            k = 0
-            while (k < numPartitions):
-                logger.info("starting debug consumer for replica on [" + brokerPort + "] partition [" + str(k) + "]", extra=d)
+            partitionId = 0
+            while (partitionId < numPartitions):
+                logger.info("starting debug consumer for replica on [" + brokerPort + "] partition [" + str(partitionId) + "]", extra=d)
+
+                if minStartingOffsetDict is not None:
+                    topicPartition = topic + "-" + str(partitionId)
+                    startingOffset = minStartingOffsetDict[topicPartition]
+
+                outputFilePathName = consumerLogPath + "/simple_consumer_" + topic + "-" + str(partitionId) + "_r" + str(replicaIndex) + ".log"
                 brokerPortLabel = brokerPort.replace(":", "_")
                 cmdList = ["ssh " + host,
                            "'JAVA_HOME=" + javaHome,
                            kafkaRunClassBin + " kafka.tools.SimpleConsumerShell",
                            "--broker-list " + brokerListStr,
                            "--topic " + topic,
-                           "--partition " + str(k),
+                           "--partition " + str(partitionId),
                            "--replica " + str(replicaIndex),
+                           "--offset " + str(startingOffset),
                            "--no-wait-at-logend ",
-                           " >> " + consumerLogPath + "/simple_consumer_" + str(replicaIndex) + ".log",
+                           " > " + outputFilePathName,
                            " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]
     
                 cmdStr = " ".join(cmdList)
     
                 logger.debug("executing command: [" + cmdStr + "]", extra=d)
-                system_test_utils.async_sys_call(cmdStr)
-                time.sleep(2)
-    
-                pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'"
-                logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
-                subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
-    
-                # keep track of the remote entity pid in a dictionary
-                for line in subproc.stdout.readlines():
-                    if line.startswith("pid"):
-                        line = line.rstrip('\n')
-                        logger.debug("found pid line: [" + line + "]", extra=d)
-                        tokens = line.split(':')
-                        testcaseEnv.consumerHostParentPidDict[host] = tokens[1]
-    
-                logger.info("sleeping for 5 sec",extra=d)
-                time.sleep(5)
-                k += 1
+                subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr)
+                # dummy for-loop to wait until the process is completed
+                for line in subproc_1.stdout.readlines():
+                    pass 
+                time.sleep(1)
+   
+                partitionId += 1
             replicaIndex += 1
 
 def validate_simple_consumer_data_matched(systemTestEnv, testcaseEnv):
@@ -1733,4 +1776,187 @@ def get_controller_attributes(systemTest
                                           tcConfigsList, "brokerid", brokerid, "entity_id")
     return controllerDict
 
+def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source"):
+
+    brokerLogStartOffsetDict = {}
+    minCommonStartOffsetDict = {}
+
+    tcConfigsList        = testcaseEnv.testcaseConfigsList
+    clusterConfigList    = systemTestEnv.clusterEntityConfigDictList
+    allBrokerConfigList  = system_test_utils.get_dict_from_list_of_dicts(clusterConfigList, "role", "broker")
+    brokerEntityIdList   = system_test_utils.get_data_from_list_of_dicts(allBrokerConfigList, "cluster_name", clusterName, "entity_id")
+
+    # loop through all brokers
+    for brokerEntityId in sorted(brokerEntityIdList):
+        # remoteLogSegmentPathName : /tmp/kafka_server_4_logs
+        # => remoteLogSegmentDir   : kafka_server_4_logs
+        remoteLogSegmentPathName = system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", brokerEntityId, "log.dir")
+        remoteLogSegmentDir      = os.path.basename(remoteLogSegmentPathName)
+        logPathName              = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
+        localLogSegmentPath      = logPathName + "/" + remoteLogSegmentDir
+
+        # loop through all topicPartition directories such as : test_1-0, test_1-1, ... 
+        for topicPartition in sorted(os.listdir(localLogSegmentPath)):
+            # found a topic-partition directory
+            if os.path.isdir(localLogSegmentPath + "/" + topicPartition):
+
+                # startingOffsetKey : <brokerEntityId>:<topicPartition>  (eg. 1:test_1-0)
+                startingOffsetKey = brokerEntityId + ":" + topicPartition
+
+                # log segment files are located in : localLogSegmentPath + "/" + topicPartition
+                # sort the log segment files under each topic-partition
+                for logFile in sorted(os.listdir(localLogSegmentPath + "/" + topicPartition)):
+
+                    # logFile is located at:
+                    # system_test/xxxx_testsuite/testcase_xxxx/logs/broker-1/kafka_server_1_logs/test_1-0/00000000000000003800.log
+                    if logFile.endswith(".log"):
+                        matchObj = re.match("0*(.*)\.log", logFile)    # remove the leading zeros & the file extension
+                        startingOffset = matchObj.group(1)             # this is the starting offset from the file name
+                        if len(startingOffset) == 0:                   # when log filename is: 00000000000000000000.log
+                            startingOffset = "0"
+
+                        # starting offset of a topic-partition can be retrieved from the filename of the first log segment
+                        # => break out of this innest for-loop after processing the first log segment file
+                        brokerLogStartOffsetDict[startingOffsetKey] = startingOffset
+                        break
+
+    # brokerLogStartOffsetDict is like this:
+    # {u'1:test_1-0': u'400',
+    #  u'1:test_1-1': u'400',
+    #  u'1:test_2-0': u'200',
+    #  u'1:test_2-1': u'200',
+    #  u'2:test_1-0': u'400',
+    #  u'2:test_1-1': u'400',
+    #  u'2:test_2-0': u'200',
+    #  u'2:test_2-1': u'200',
+    #  u'3:test_1-0': '0',
+    #  u'3:test_1-1': '0',
+    #  u'3:test_2-0': '0',
+    #  u'3:test_2-1': '0'}
+
+    # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition    
+    for brokerTopicPartition in sorted(brokerLogStartOffsetDict.iterkeys()):
+        topicPartition = brokerTopicPartition.split(':')[1]
+
+        if topicPartition in minCommonStartOffsetDict:
+            # key exists => if the new value is greater, replace the existing value with new
+            if minCommonStartOffsetDict[topicPartition] < brokerLogStartOffsetDict[brokerTopicPartition]:
+                minCommonStartOffsetDict[topicPartition] = brokerLogStartOffsetDict[brokerTopicPartition]
+        else:
+            # key doesn't exist => add it to the dictionary
+            minCommonStartOffsetDict[topicPartition] = brokerLogStartOffsetDict[brokerTopicPartition]
+
+    # returning minCommonStartOffsetDict which is like this:
+    # {u'test_1-0': u'400',
+    #  u'test_1-1': u'400',
+    #  u'test_2-0': u'200',
+    #  u'test_2-1': u'200'}
+    return minCommonStartOffsetDict
+
+def validate_simple_consumer_data_matched_across_replicas(systemTestEnv, testcaseEnv):
+    validationStatusDict        = testcaseEnv.validationStatusDict
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    consumerEntityIdList        = system_test_utils.get_data_from_list_of_dicts(
+                                  clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
+    replicaFactor               = testcaseEnv.testcaseArgumentsDict["replica_factor"]
+    numPartition                = testcaseEnv.testcaseArgumentsDict["num_partition"]
+
+    # Unique messages from producer on [test_1]  :  1500
+    # Unique messages from consumer on [test_1]  :  1500
+
+    # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r1.log  :  750
+    # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r2.log  :  750
+    # Unique messages from consumer on [test_1] at simple_consumer_test_1-0_r3.log  :  0
+
+    # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r1.log  :  0
+    # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r2.log  :  750
+    # Unique messages from consumer on [test_1] at simple_consumer_test_1-1_r3.log  :  750
+
+    # ==================================================
+
+    # Unique messages from producer on [test_2]  :  1000
+    # Unique messages from consumer on [test_2]  :  1000
+
+    # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r1.log  :  500
+    # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r2.log  :  0
+    # Unique messages from consumer on [test_2] at simple_consumer_test_2-0_r3.log  :  500
+
+    # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r1.log  :  500
+    # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r2.log  :  500
+    # Unique messages from consumer on [test_2] at simple_consumer_test_2-1_r3.log  :  0
+
+    mismatchCounter = 0
+    for consumerEntityId in consumerEntityIdList:
+
+        topic           = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+        consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", consumerEntityId, "default")
+
+        replicaIdxMsgCountDictList = []
+        # replicaIdxMsgCountDictList is being used as follows:
+        #
+        # the above replica message count will be organized as follows:
+        # index of the list would map to the partitionId
+        # each element in the list maps to the replicaIdx-MessageCount
+        # to validate that :
+        # 1. there should be "no. of broker" of non-zero message count and they are equal
+        # 2. there should be "no. of broker - replication factor" of zero count
+        # [{"1": "750", "2": "750", "3": "0"  },
+        #  {"1": "0"  , "2": "750", "3": "750"}]
+
+        j = 0
+        while j < int(numPartition):
+            newDict = {}
+            replicaIdxMsgCountDictList.append(newDict)
+            j += 1
+
+        for logFile in sorted(os.listdir(consumerLogPath)):
+
+            if logFile.startswith("simple_consumer_") and logFile.endswith(".log"):
+                matchObj    = re.match("simple_consumer_"+topic+"-(\d*)_r(\d*)\.log" , logFile)
+                partitionId = int(matchObj.group(1))
+                replicaIdx  = int(matchObj.group(2))
+
+                consumerLogPathName   = consumerLogPath + "/" + logFile
+                consumerMsgIdList     = get_message_id(consumerLogPathName)
+                consumerMsgIdSet      = set(consumerMsgIdList)
+
+                replicaIdxMsgCountDictList[partitionId][replicaIdx] = len(consumerMsgIdSet)
+
+                logger.info("no. of unique messages on topic [" + topic + "] at " + logFile + " : " + str(len(consumerMsgIdSet)), extra=d)
+                validationStatusDict["Unique messages from consumer on [" + topic + "] at " + logFile] = str(len(consumerMsgIdSet))
+
+        pprint.pprint(replicaIdxMsgCountDictList)
+
+        partitionId = 0
+        while partitionId < int(numPartition):
+            zeroMsgCounter    = 0
+            nonZeroMsgCounter = 0
+            nonZeroMsgValue   = -1
+
+            for replicaIdx in sorted(replicaIdxMsgCountDictList[partitionId].iterkeys()):
+                if replicaIdxMsgCountDictList[partitionId][int(replicaIdx)] == 0:
+                    zeroMsgCounter += 1
+                else:
+                    if nonZeroMsgValue == -1:
+                        nonZeroMsgValue = replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]
+                    else:
+                        if nonZeroMsgValue != replicaIdxMsgCountDictList[partitionId][int(replicaIdx)]:
+                            mismatchCounter += 1
+                    nonZeroMsgCounter += 1
+            partitionId += 1
+
+            logger.info("topic " + topic + " : no. of brokers with zero msg count     : " + str(zeroMsgCounter), extra=d)
+            logger.info("topic " + topic + " : no. of brokers with non-zero msg count : " + str(nonZeroMsgCounter), extra=d)
+            logger.info("topic " + topic + " : non-zero brokers msg count             : " + str(nonZeroMsgValue), extra=d)
+
+        if mismatchCounter == 0:
+            validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "PASSED"
+        else:
+            validationStatusDict["Validate for data matched on topic [" + topic + "] across replicas"] = "FAILED"
+
+
+
+
+
+
 

Modified: incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/testcase_env.py?rev=1407680&r1=1407679&r2=1407680&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/testcase_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/testcase_env.py Fri Nov  9 22:58:23 2012
@@ -51,6 +51,12 @@ class TestcaseEnv():
     # { 0: 12345, 1: 12389, ... }
     entityMirrorMakerParentPidDict = {}
 
+    # dictionary of entity_id to ppid for migration tool entities
+    # key: entity_id
+    # val: ppid of broker associated to that entity_id
+    # { 0: 12345, 1: 12389, ... }
+    entityMigrationToolParentPidDict = {}
+
     # dictionary of entity_id to list of JMX ppid
     # key: entity_id
     # val: list of JMX ppid associated to that entity_id



Mime
View raw message