kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: KAFKA-5849; Add process stop, round trip workload, partitioned test
Date Wed, 20 Dec 2017 21:33:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 488ea4b9f -> 760d86a97


KAFKA-5849; Add process stop, round trip workload, partitioned test

* Implement process stop faults via SIGSTOP / SIGCONT

* Implement RoundTripWorkload, which both sends messages, and confirms that they are received at least once.

* Allow Trogdor tasks to block until other Trogdor tasks are complete.

* Add CreateTopicsWorker, which can be a building block for a lot of tests.

* Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON.

* Implement some fault injection tests in round_trip_workload_test.py

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #4323 from cmccabe/KAFKA-5849


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/760d86a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/760d86a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/760d86a9

Branch: refs/heads/trunk
Commit: 760d86a970d20643f1b8f71e78c60a7ad2be7003
Parents: 488ea4b
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Wed Dec 20 21:35:33 2017 +0000
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Wed Dec 20 21:35:33 2017 +0000

----------------------------------------------------------------------
 .../trogdor/files_unreadable_fault_spec.py      |  30 +-
 tests/kafkatest/services/trogdor/kibosh.py      |   2 +-
 .../trogdor/network_partition_fault_spec.py     |  19 +-
 .../services/trogdor/no_op_task_spec.py         |   8 +-
 .../services/trogdor/process_stop_fault_spec.py |  38 +++
 .../services/trogdor/produce_bench_workload.py  |  29 +-
 .../services/trogdor/round_trip_workload.py     |  49 +++
 tests/kafkatest/services/trogdor/task_spec.py   |  21 +-
 tests/kafkatest/services/trogdor/trogdor.py     |   4 +-
 .../tests/core/round_trip_fault_test.py         |  89 +++++
 tests/kafkatest/tests/tools/kibosh_test.py      |   2 +-
 tests/kafkatest/tests/tools/trogdor_test.py     |   7 +-
 .../kafka/trogdor/common/WorkerUtils.java       | 142 ++++++++
 .../kafka/trogdor/coordinator/TaskManager.java  |   2 +
 .../fault/ProcessStopFaultController.java       |  35 ++
 .../trogdor/fault/ProcessStopFaultSpec.java     |  66 ++++
 .../trogdor/fault/ProcessStopFaultWorker.java   |  81 +++++
 .../workload/ProduceBenchController.java        |  37 --
 .../trogdor/workload/ProduceBenchSpec.java      |  10 +-
 .../trogdor/workload/ProduceBenchWorker.java    |  94 ++---
 .../kafka/trogdor/workload/RoundTripWorker.java | 340 +++++++++++++++++++
 .../trogdor/workload/RoundTripWorkloadSpec.java |  98 ++++++
 .../apache/kafka/trogdor/task/TaskSpecTest.java |  52 +++
 23 files changed, 1070 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
index 4f0540a..618efd3 100644
--- a/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
+++ b/tests/kafkatest/services/trogdor/files_unreadable_fault_spec.py
@@ -34,25 +34,13 @@ class FilesUnreadableFaultSpec(TaskSpec):
         :param error_code:      The error code to use.
         """
         super(FilesUnreadableFaultSpec, self).__init__(start_ms, duration_ms)
-        self.node_names = node_names
-        self.mount_path = mount_path
-        self.prefix = prefix
-        self.error_code = error_code
+        self.message["class"] = "org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec"
+        self.message["nodeNames"] = node_names
+        self.message["mountPath"] = mount_path
+        self.message["prefix"] = prefix
+        self.message["errorCode"] = error_code
 
-    def message(self):
-        return {
-            "class": "org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec",
-            "startMs": self.start_ms,
-            "durationMs": self.duration_ms,
-            "nodeNames": self.node_names,
-            "mountPath": self.mount_path,
-            "prefix": self.prefix,
-            "errorCode": self.error_code,
-        }
-
-    def kibosh_message(self):
-        return {
-            "type": "unreadable",
-            "prefix": self.prefix,
-            "code": self.error_code,
-        }
+        self.kibosh_message = {}
+        self.kibosh_message["type"] = "unreadable"
+        self.kibosh_message["prefix"] = prefix
+        self.kibosh_message["code"] = error_code

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/kibosh.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/kibosh.py b/tests/kafkatest/services/trogdor/kibosh.py
index de4e5e6..47c119e 100644
--- a/tests/kafkatest/services/trogdor/kibosh.py
+++ b/tests/kafkatest/services/trogdor/kibosh.py
@@ -133,7 +133,7 @@ class KiboshService(Service):
         :param node:        The node.
         :param spec:        An array of FaultSpec objects describing the faults.
         """
-        fault_array = [spec.kibosh_message() for spec in specs]
+        fault_array = [spec.kibosh_message for spec in specs]
         obj = { 'faults': fault_array }
         obj_json = json.dumps(obj)
         node.account.create_file(self.control_path, obj_json)

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/network_partition_fault_spec.py b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
index 91c731e..4b8c9d3 100644
--- a/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
+++ b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py
@@ -35,20 +35,5 @@ class NetworkPartitionFaultSpec(TaskSpec):
                                 or ClusterNode objects.
         """
         super(NetworkPartitionFaultSpec, self).__init__(start_ms, duration_ms)
-        self.partitions = []
-        for partition in partitions:
-            nodes = []
-            for obj in partition:
-                if isinstance(obj, basestring):
-                    nodes.append(obj)
-                else:
-                    nodes.append(obj.name)
-            self.partitions.append(nodes)
-
-    def message(self):
-        return {
-            "class": "org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec",
-            "startMs": self.start_ms,
-            "durationMs": self.duration_ms,
-            "partitions": self.partitions,
-        }
+        self.message["class"] = "org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec"
+        self.message["partitions"] = [TaskSpec.to_node_names(p) for p in partitions]

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/no_op_task_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/no_op_task_spec.py b/tests/kafkatest/services/trogdor/no_op_task_spec.py
index eb75264..9238af4 100644
--- a/tests/kafkatest/services/trogdor/no_op_task_spec.py
+++ b/tests/kafkatest/services/trogdor/no_op_task_spec.py
@@ -32,10 +32,4 @@ class NoOpTaskSpec(TaskSpec):
         :param duration_ms:     The duration in milliseconds.
         """
         super(NoOpTaskSpec, self).__init__(start_ms, duration_ms)
-
-    def message(self):
-        return {
-            "class": "org.apache.kafka.trogdor.task.NoOpTaskSpec",
-            "startMs": self.start_ms,
-            "durationMs": self.duration_ms,
-        }
+        self.message["class"] = "org.apache.kafka.trogdor.task.NoOpTaskSpec";

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/process_stop_fault_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/process_stop_fault_spec.py b/tests/kafkatest/services/trogdor/process_stop_fault_spec.py
new file mode 100644
index 0000000..3315f1e
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/process_stop_fault_spec.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class ProcessStopFaultSpec(TaskSpec):
+    """
+    The specification for a process stop fault.
+    """
+
+    def __init__(self, start_ms, duration_ms, nodes, java_process_name):
+        """
+        Create a new ProcessStopFaultSpec.
+
+        :param start_ms:            The start time, as described in task_spec.py
+        :param duration_ms:         The duration in milliseconds.
+        :param node_names:          An array describing the nodes to stop processes on.  The array
+                                    may contain either node names, or ClusterNode objects.
+        :param java_process_name:   The name of the java process to stop.  This is the name which
+                                    is reported by jps, etc., not the OS-level process name.
+        """
+        super(ProcessStopFaultSpec, self).__init__(start_ms, duration_ms)
+        self.message["class"] = "org.apache.kafka.trogdor.fault.ProcessStopFaultSpec"
+        self.message["nodeNames"] = TaskSpec.to_node_names(nodes)
+        self.message["javaProcessName"] = java_process_name

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/produce_bench_workload.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index 9d1f005..bce28c0 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -23,27 +23,14 @@ class ProduceBenchWorkloadSpec(TaskSpec):
                  target_messages_per_sec, max_messages, producer_conf,
                  total_topics, active_topics):
         super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
-        self.producer_node = producer_node
-        self.bootstrap_servers = bootstrap_servers
-        self.target_messages_per_sec = target_messages_per_sec
-        self.max_messages = max_messages
-        self.producer_conf = producer_conf
-        self.total_topics = total_topics
-        self.active_topics = active_topics
-
-    def message(self):
-        return {
-            "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
-            "startMs": self.start_ms,
-            "durationMs": self.duration_ms,
-            "producerNode": self.producer_node,
-            "bootstrapServers": self.bootstrap_servers,
-            "targetMessagesPerSec": self.target_messages_per_sec,
-            "maxMessages": self.max_messages,
-            "producerConf": self.producer_conf,
-            "totalTopics": self.total_topics,
-            "activeTopics": self.active_topics,
-        }
+        self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec"
+        self.message["producerNode"] = producer_node
+        self.message["bootstrapServers"] = bootstrap_servers
+        self.message["targetMessagesPerSec"] = target_messages_per_sec
+        self.message["maxMessages"] = max_messages
+        self.message["producerConf"] = producer_conf
+        self.message["totalTopics"] = total_topics
+        self.message["activeTopics"] = active_topics
 
 
 class ProduceBenchWorkloadService(Service):

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/round_trip_workload.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/round_trip_workload.py b/tests/kafkatest/services/trogdor/round_trip_workload.py
new file mode 100644
index 0000000..588bba8
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/round_trip_workload.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.services.service import Service
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class RoundTripWorkloadSpec(TaskSpec):
+    def __init__(self, start_ms, duration_ms, client_node, bootstrap_servers,
+                 target_messages_per_sec, partition_assignments, max_messages):
+        super(RoundTripWorkloadSpec, self).__init__(start_ms, duration_ms)
+        self.message["class"] = "org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec"
+        self.message["clientNode"] = client_node
+        self.message["bootstrapServers"] = bootstrap_servers
+        self.message["targetMessagesPerSec"] = target_messages_per_sec
+        self.message["partitionAssignments"] = partition_assignments
+        self.message["maxMessages"] = max_messages
+
+
+class RoundTripWorkloadService(Service):
+    def __init__(self, context, kafka):
+        Service.__init__(self, context, num_nodes=1)
+        self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
+        self.client_node = self.nodes[0].account.hostname
+
+    def free(self):
+        Service.free(self)
+
+    def wait_node(self, node, timeout_sec=None):
+        pass
+
+    def stop_node(self, node):
+        pass
+
+    def clean_node(self, node):
+        pass

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/task_spec.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/task_spec.py b/tests/kafkatest/services/trogdor/task_spec.py
index 61a080a..ae457ae 100644
--- a/tests/kafkatest/services/trogdor/task_spec.py
+++ b/tests/kafkatest/services/trogdor/task_spec.py
@@ -32,14 +32,23 @@ class TaskSpec(object):
         :param start_ms:        The target start time in milliseconds since the epoch.
         :param duration_ms:     The duration in milliseconds.
         """
-        self.start_ms = start_ms
-        self.duration_ms = duration_ms
+        self.message = {
+            'startMs': start_ms,
+            'durationMs': duration_ms
+        }
 
-    def message(self):
+    @staticmethod
+    def to_node_names(nodes):
         """
-        Return a message suitable for sending to the Trogdor daemon.
+        Convert an array of nodes or node names to an array of node names.
         """
-        raise NotImplemented
+        node_names = []
+        for obj in nodes:
+            if isinstance(obj, basestring):
+                node_names.append(obj)
+            else:
+                node_names.append(obj.name)
+        return node_names
 
     def __str__(self):
-        return json.dumps(self.message())
+        return json.dumps(self.message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/services/trogdor/trogdor.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/trogdor.py b/tests/kafkatest/services/trogdor/trogdor.py
index a4fcfb5..4d514f2 100644
--- a/tests/kafkatest/services/trogdor/trogdor.py
+++ b/tests/kafkatest/services/trogdor/trogdor.py
@@ -171,7 +171,7 @@ class TrogdorService(KafkaPathResolverMixin, Service):
                 stdout_stderr_capture_path)
         node.account.ssh(cmd)
         with node.account.monitor_log(log_path) as monitor:
-            monitor.wait_until("Starting %s process." % daemon_name, timeout_sec=60, backoff_sec=.25,
+            monitor.wait_until("Starting %s process." % daemon_name, timeout_sec=60, backoff_sec=.10,
                                err_msg=("%s on %s didn't finish startup" % (daemon_name, node.name)))
 
     def wait_node(self, node, timeout_sec=None):
@@ -260,7 +260,7 @@ class TrogdorService(KafkaPathResolverMixin, Service):
         :param id:          The task id.
         :param spec:        The task spec.
         """
-        self._coordinator_post("task/create", { "id": id, "spec": spec.message()})
+        self._coordinator_post("task/create", { "id": id, "spec": spec.message})
         return TrogdorTask(id, self)
 
     def stop_task(self, id):

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/tests/core/round_trip_fault_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py
new file mode 100644
index 0000000..f03d6a1
--- /dev/null
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from ducktape.tests.test import Test
+from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.trogdor.process_stop_fault_spec import ProcessStopFaultSpec
+from kafkatest.services.trogdor.round_trip_workload import RoundTripWorkloadService, RoundTripWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class RoundTripFaultTest(Test):
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(RoundTripFaultTest, self).__init__(test_context)
+        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk)
+        self.workload_service = RoundTripWorkloadService(test_context, self.kafka)
+        self.trogdor = TrogdorService(context=self.test_context,
+                                      client_services=[self.zk, self.kafka, self.workload_service])
+        self.round_trip_spec = RoundTripWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                     self.workload_service.client_node,
+                                     self.workload_service.bootstrap_servers,
+                                     target_messages_per_sec=10000,
+                                     partition_assignments={0: [0,1,2]},
+                                     max_messages=100000)
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+        self.trogdor.start()
+
+    def teardown(self):
+        self.trogdor.stop()
+        self.kafka.stop()
+        self.zk.stop()
+
+    def test_round_trip_workload(self):
+        workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
+        workload1.wait_for_done(timeout_sec=600)
+
+    def test_round_trip_workload_with_broker_partition(self):
+        workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
+        time.sleep(2)
+        part1 = [self.kafka.nodes[0]]
+        part2 = self.kafka.nodes[1:] + [self.workload_service.nodes[0]] + self.zk.nodes
+        partition1_spec = NetworkPartitionFaultSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                    [part1, part2])
+        partition1 = self.trogdor.create_task("partition1", partition1_spec)
+        workload1.wait_for_done(timeout_sec=600)
+        partition1.stop()
+        partition1.wait_for_done()
+
+    def test_produce_consume_with_broker_pause(self):
+        workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
+        time.sleep(2)
+        stop1_spec = ProcessStopFaultSpec(0, TaskSpec.MAX_DURATION_MS, [self.kafka.nodes[0]],
+                                           self.kafka.java_class_name())
+        stop1 = self.trogdor.create_task("stop1", stop1_spec)
+        workload1.wait_for_done(timeout_sec=600)
+        stop1.stop()
+        stop1.wait_for_done()
+        self.kafka.stop_node(self.kafka.nodes[0], False)
+
+    def test_produce_consume_with_client_partition(self):
+        workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
+        time.sleep(2)
+        part1 = [self.workload_service.nodes[0]]
+        part2 = self.kafka.nodes + self.zk.nodes
+        partition1_spec = NetworkPartitionFaultSpec(0, 60000, [part1, part2])
+        stop1 = self.trogdor.create_task("stop1", partition1_spec)
+        workload1.wait_for_done(timeout_sec=600)
+        stop1.stop()
+        stop1.wait_for_done()

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/tests/tools/kibosh_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/kibosh_test.py b/tests/kafkatest/tests/tools/kibosh_test.py
index 5844c27..d31be87 100644
--- a/tests/kafkatest/tests/tools/kibosh_test.py
+++ b/tests/kafkatest/tests/tools/kibosh_test.py
@@ -72,7 +72,7 @@ class KiboshTest(Test):
 
         def check(self, node):
             fault_json = self.kibosh.get_fault_json(node)
-            expected_json = json.dumps({"faults": [spec.kibosh_message()]})
+            expected_json = json.dumps({"faults": [spec.kibosh_message]})
             self.logger.info("Read back: [%s].  Expected: [%s]." % (fault_json, expected_json))
             return fault_json == expected_json
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tests/kafkatest/tests/tools/trogdor_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/trogdor_test.py b/tests/kafkatest/tests/tools/trogdor_test.py
index 44d00b2..07fc318 100644
--- a/tests/kafkatest/tests/tools/trogdor_test.py
+++ b/tests/kafkatest/tests/tools/trogdor_test.py
@@ -76,9 +76,10 @@ class TrogdorTest(Test):
         self.set_up_trogdor(3)
         spec = NetworkPartitionFaultSpec(0, TaskSpec.MAX_DURATION_MS,
                                             [[self.agent_nodes[0]], self.agent_nodes[1:]])
-        assert 2 == len(spec.partitions)
-        assert [self.agent_nodes[0].name] == spec.partitions[0]
-        assert [self.agent_nodes[1].name, self.agent_nodes[2].name] == spec.partitions[1]
+        partitions = spec.message["partitions"]
+        assert 2 == len(partitions)
+        assert [self.agent_nodes[0].name] == partitions[0]
+        assert [self.agent_nodes[1].name, self.agent_nodes[2].name] == partitions[1]
         self.trogdor.create_task("partition0", spec)
         def verify_nodes_partitioned():
             if node_is_reachable(self.agent_nodes[0], self.agent_nodes[1]):

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
new file mode 100644
index 0000000..58f8278
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Utilities for Trogdor TaskWorkers.
+ */
+public final class WorkerUtils {
+    /**
+     * Handle an exception in a TaskWorker.
+     *
+     * @param log               The logger to use.
+     * @param what              The component that had the exception.
+     * @param exception         The exception.
+     * @param doneFuture        The TaskWorker's doneFuture
+     * @throws KafkaException   A wrapped version of the exception.
+     */
+    public static void abort(Logger log, String what, Throwable exception,
+            KafkaFutureImpl<String> doneFuture) throws KafkaException {
+        log.warn("{} caught an exception: ", what, exception);
+        doneFuture.complete(exception.getMessage());
+        throw new KafkaException(exception);
+    }
+
+    /**
+     * Convert a rate expressed per second to a rate expressed per the given period.
+     *
+     * @param perSec            The per-second rate.
+     * @param periodMs          The new period to use.
+     * @return                  The rate per period.  This will never be less than 1.
+     */
+    public static int perSecToPerPeriod(float perSec, long periodMs) {
+        float period = ((float) periodMs) / 1000.0f;
+        float perPeriod = perSec * period;
+        perPeriod = Math.max(1.0f, perPeriod);
+        return (int) perPeriod;
+    }
+
+    private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
+    private static final int CREATE_TOPICS_CALL_TIMEOUT = 90000;
+    private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
+
+            //Map<String, Map<Integer, List<Integer>>> topics) throws Throwable {
+
+    /**
+     * Create some Kafka topics.
+     *
+     * @param log               The logger to use.
+     * @param bootstrapServers  The bootstrap server list.
+     * @param topics            Maps topic names to partition assignments.
+     */
+    public static void createTopics(Logger log, String bootstrapServers,
+            Collection<NewTopic> topics) throws Throwable {
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
+        try (AdminClient adminClient = AdminClient.create(props)) {
+            long startMs = Time.SYSTEM.milliseconds();
+            int tries = 0;
+
+            Map<String, NewTopic> newTopics = new HashMap<>();
+            for (NewTopic newTopic : topics) {
+                newTopics.put(newTopic.name(), newTopic);
+            }
+            List<String> topicsToCreate = new ArrayList<>(newTopics.keySet());
+            while (true) {
+                log.info("Attemping to create {} topics (try {})...", topicsToCreate.size(), ++tries);
+                Map<String, Future<Void>> creations = new HashMap<>();
+                while (!topicsToCreate.isEmpty()) {
+                    List<NewTopic> newTopicsBatch = new ArrayList<>();
+                    for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) &&
+                            !topicsToCreate.isEmpty(); i++) {
+                        String topicName = topicsToCreate.remove(0);
+                        newTopicsBatch.add(newTopics.get(topicName));
+                    }
+                    creations.putAll(adminClient.createTopics(newTopicsBatch).values());
+                }
+                // We retry cases where the topic creation failed with a
+                // timeout.  This is a workaround for KAFKA-6368.
+                for (Map.Entry<String, Future<Void>> entry : creations.entrySet()) {
+                    String topicName = entry.getKey();
+                    Future<Void> future = entry.getValue();
+                    try {
+                        future.get();
+                        log.debug("Successfully created {}.", topicName);
+                    } catch (ExecutionException e) {
+                        if (e.getCause() instanceof TimeoutException) {
+                            log.warn("Timed out attempting to create {}: {}", topicName, e.getCause().getMessage());
+                            topicsToCreate.add(topicName);
+                        } else {
+                            log.warn("Failed to create {}", topicName, e.getCause());
+                            throw e.getCause();
+                        }
+                    }
+                }
+                if (topicsToCreate.isEmpty()) {
+                    break;
+                }
+                if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
+                    String str = "Unable to create topic(s): " +
+                            Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
+                    log.warn(str);
+                    throw new TimeoutException(str);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 547c9da..286f904 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -117,6 +117,8 @@ public final class TaskManager {
                 this.nodeManagers.put(node.name(), new NodeManager(node, this));
             }
         }
+        log.info("Created TaskManager for agent(s) on: {}",
+            Utils.join(nodeManagers.keySet(), ", "));
     }
 
     enum ManagedTaskState {

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultController.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultController.java
new file mode 100644
index 0000000..6ec803a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultController.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+import java.util.Set;
+
+public class ProcessStopFaultController implements TaskController {
+    private final Set<String> nodeNames;
+
+    public ProcessStopFaultController(Set<String> nodeNames) {
+        this.nodeNames = nodeNames;
+    }
+
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        return nodeNames;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java
new file mode 100644
index 0000000..505baa9
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultSpec.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * The specification for a fault that creates a network partition.
+ */
+public class ProcessStopFaultSpec extends TaskSpec {
+    private final Set<String> nodeNames;
+    private final String javaProcessName;
+
+    @JsonCreator
+    public ProcessStopFaultSpec(@JsonProperty("startMs") long startMs,
+                        @JsonProperty("durationMs") long durationMs,
+                        @JsonProperty("nodeNames") List<String> nodeNames,
+                        @JsonProperty("javaProcessName") String javaProcessName) {
+        super(startMs, durationMs);
+        this.nodeNames = new HashSet<>(nodeNames);
+        this.javaProcessName = javaProcessName;
+    }
+
+    @JsonProperty
+    public Set<String> nodeNames() {
+        return nodeNames;
+    }
+
+    @JsonProperty
+    public String javaProcessName() {
+        return javaProcessName;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return new ProcessStopFaultController(nodeNames);
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ProcessStopFaultWorker(id, javaProcessName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
new file mode 100644
index 0000000..66a8c6e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ProcessStopFaultWorker implements TaskWorker {
+    private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class);
+
+    private final String id;
+
+    private final String javaProcessName;
+
+    public ProcessStopFaultWorker(String id, String javaProcessName) {
+        this.id = id;
+        this.javaProcessName = javaProcessName;
+    }
+
+    @Override
+    public void start(Platform platform, AtomicReference<String> status,
+                      KafkaFutureImpl<String> errorFuture) throws Exception {
+        log.info("Activating ProcessStopFault {}.", id);
+        sendSignals(platform, "SIGSTOP");
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        log.info("Deactivating ProcessStopFault {}.", id);
+        sendSignals(platform, "SIGCONT");
+    }
+
+    private void sendSignals(Platform platform, String signalName) throws Exception {
+        String jcmdOutput = platform.runCommand(new String[] {"jcmd"});
+        String[] lines = jcmdOutput.split("\n");
+        List<Integer> pids = new ArrayList<>();
+        for (String line : lines) {
+            if (line.contains(javaProcessName)) {
+                String[] components = line.split(" ");
+                try {
+                    pids.add(Integer.parseInt(components[0]));
+                } catch (NumberFormatException e) {
+                    log.error("Failed to parse process ID from line {}", e);
+                }
+            }
+        }
+        if (pids.isEmpty()) {
+            log.error("{}: no processes containing {} found to send {} to.",
+                id, javaProcessName, signalName);
+        } else {
+            log.info("{}: sending {} to {} pid(s) {}",
+                id, signalName, javaProcessName, Utils.join(pids, ", "));
+            for (Integer pid : pids) {
+                platform.runCommand(new String[] {"kill", "-" + signalName, pid.toString()});
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
deleted file mode 100644
index c56a22a..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.workload;
-
-import org.apache.kafka.trogdor.common.Topology;
-import org.apache.kafka.trogdor.task.TaskController;
-
-import java.util.Collections;
-import java.util.Set;
-
-public class ProduceBenchController implements TaskController {
-    private final ProduceBenchSpec spec;
-
-    public ProduceBenchController(ProduceBenchSpec spec) {
-        this.spec = spec;
-    }
-
-    @Override
-    public Set<String> targetNodes(Topology topology) {
-        return Collections.singleton(spec.producerNode());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 3fb185e..9f25842 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -19,11 +19,14 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.Topology;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The specification for a benchmark that produces messages to a set of topics.
@@ -94,7 +97,12 @@ public class ProduceBenchSpec extends TaskSpec {
 
     @Override
     public TaskController newController(String id) {
-        return new ProduceBenchController(this);
+        return new TaskController() {
+            @Override
+            public Set<String> targetNodes(Topology topology) {
+                return Collections.singleton(producerNode);
+            }
+        };
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 0d50e3e..27e49cd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -19,15 +19,12 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -35,6 +32,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
 import org.apache.kafka.trogdor.task.TaskWorker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +52,8 @@ import java.util.concurrent.atomic.AtomicReference;
 public class ProduceBenchWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
 
+    private static final short NUM_PARTITIONS = 1;
+
     private static final short REPLICATION_FACTOR = 3;
 
     private static final int MESSAGE_SIZE = 512;
@@ -72,6 +72,16 @@ public class ProduceBenchWorker implements TaskWorker {
 
     private KafkaFutureImpl<String> doneFuture;
 
+    /**
+     * Generate a topic name based on a topic number.
+     *
+     * @param topicIndex        The topic number.
+     * @return                  The topic name.
+     */
+    public static String topicIndexToName(int topicIndex) {
+        return String.format("topic%05d", topicIndex);
+    }
+
     public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -88,22 +98,12 @@ public class ProduceBenchWorker implements TaskWorker {
             ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false));
         this.status = status;
         this.doneFuture = doneFuture;
-        executor.submit(new ValidateSpec());
-    }
-
-    private static String topicIndexToName(int topicIndex) {
-        return String.format("topic%05d", topicIndex);
-    }
-
-    private void abort(String what, Exception e) throws KafkaException {
-        log.warn(what + " caught an exception: ", e);
-        doneFuture.completeExceptionally(new KafkaException(what + " caught an exception.", e));
-        throw new KafkaException(e);
+        executor.submit(new Prepare());
     }
 
-    public class ValidateSpec implements Callable<Void> {
+    public class Prepare implements Runnable {
         @Override
-        public Void call() throws Exception {
+        public void run() {
             try {
                 if (spec.activeTopics() == 0) {
                     throw new ConfigException("Can't have activeTopics == 0.");
@@ -113,47 +113,15 @@ public class ProduceBenchWorker implements TaskWorker {
                         "activeTopics was %d, but totalTopics was only %d.  activeTopics must " +
                             "be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics()));
                 }
-                executor.submit(new CreateBenchmarkTopics());
-            } catch (Exception e) {
-                abort("ValidateSpec", e);
-            }
-            return null;
-        }
-    }
-
-    public class CreateBenchmarkTopics implements Callable<Void> {
-        private final static int MAX_BATCH_SIZE = 10;
-
-        @Override
-        public Void call() throws Exception {
-            try {
-                Properties props = new Properties();
-                props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
-                try (AdminClient adminClient = AdminClient.create(props)) {
-                    List<String> topicsToCreate = new ArrayList<>();
-                    for (int i = 0; i < spec.totalTopics(); i++) {
-                        topicsToCreate.add(topicIndexToName(i));
-                    }
-                    log.info("Creating " + spec.totalTopics() + " topics...");
-                    List<Future<Void>> futures = new ArrayList<>();
-                    while (!topicsToCreate.isEmpty()) {
-                        List<NewTopic> newTopics = new ArrayList<>();
-                        for (int i = 0; (i < MAX_BATCH_SIZE) && !topicsToCreate.isEmpty(); i++) {
-                            String topic = topicsToCreate.remove(0);
-                            newTopics.add(new NewTopic(topic, 1, REPLICATION_FACTOR));
-                        }
-                        futures.add(adminClient.createTopics(newTopics).all());
-                    }
-                    for (Future<Void> future : futures) {
-                        future.get();
-                    }
-                    log.info("Successfully created " + spec.totalTopics() + " topics.");
+                List<NewTopic> newTopics = new ArrayList<>();
+                for (int i = 0; i < spec.totalTopics(); i++) {
+                    newTopics.add(new NewTopic(topicIndexToName(i), NUM_PARTITIONS, REPLICATION_FACTOR));
                 }
+                WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics);
                 executor.submit(new SendRecords());
-            } catch (Exception e) {
-                abort("CreateBenchmarkTopics", e);
+            } catch (Throwable e) {
+                WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
-            return null;
         }
     }
 
@@ -177,13 +145,6 @@ public class ProduceBenchWorker implements TaskWorker {
         }
     }
 
-    private int perSecToPerPeriod(float perSec, long periodMs) {
-        float period = ((float) periodMs) / 1000.0f;
-        float perPeriod = perSec * period;
-        perPeriod = Math.max(1.0f, perPeriod);
-        return (int) perPeriod;
-    }
-
     /**
      * A subclass of Throttle which flushes the Producer right before the throttle injects a delay.
      * This avoids including throttling latency in latency measurements.
@@ -217,7 +178,7 @@ public class ProduceBenchWorker implements TaskWorker {
 
         SendRecords() {
             this.histogram = new Histogram(5000);
-            int perPeriod = perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+            int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
                 new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
             Properties props = new Properties();
@@ -251,7 +212,7 @@ public class ProduceBenchWorker implements TaskWorker {
                     producer.close();
                 }
             } catch (Exception e) {
-                abort("SendRecords", e);
+                WorkerUtils.abort(log, "SendRecords", e, doneFuture);
             } finally {
                 statusUpdaterFuture.cancel(false);
                 new StatusUpdater(histogram).run();
@@ -274,10 +235,7 @@ public class ProduceBenchWorker implements TaskWorker {
 
         StatusUpdater(Histogram histogram) {
             this.histogram = histogram;
-            this.percentiles = new float[3];
-            this.percentiles[0] = 0.50f;
-            this.percentiles[1] = 0.95f;
-            this.percentiles[2] = 0.99f;
+            this.percentiles = new float[] {0.50f, 0.95f, 0.99f};
         }
 
         @Override
@@ -291,7 +249,7 @@ public class ProduceBenchWorker implements TaskWorker {
                 String statusDataString = JsonUtil.toJsonString(statusData);
                 status.set(statusDataString);
             } catch (Exception e) {
-                abort("StatusUpdater", e);
+                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
new file mode 100644
index 0000000..9031c45
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RoundTripWorker implements TaskWorker {
+    private static final int THROTTLE_PERIOD_MS = 100;
+
+    private static final int VALUE_SIZE = 512;
+
+    private static final int LOG_INTERVAL_MS = 5000;
+
+    private static final int LOG_NUM_MESSAGES = 10;
+
+    private static final String TOPIC_NAME = "round_trip_topic";
+
+    private static final Logger log = LoggerFactory.getLogger(RoundTripWorker.class);
+
+    private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker();
+
+    private final String id;
+
+    private final RoundTripWorkloadSpec spec;
+
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private ExecutorService executor;
+
+    private KafkaFutureImpl<String> doneFuture;
+
+    private KafkaProducer<String, byte[]> producer;
+
+    private KafkaConsumer<String, byte[]> consumer;
+
+    private CountDownLatch unackedSends;
+
+    public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void start(Platform platform, AtomicReference<String> status,
+                      KafkaFutureImpl<String> doneFuture) throws Exception {
+        if (!running.compareAndSet(false, true)) {
+            throw new IllegalStateException("RoundTripWorker is already running.");
+        }
+        log.info("{}: Activating RoundTripWorker.", id);
+        this.executor = Executors.newCachedThreadPool(
+            ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
+        this.doneFuture = doneFuture;
+        this.producer = null;
+        this.consumer = null;
+        this.unackedSends = new CountDownLatch(spec.maxMessages());
+        executor.submit(new Prepare());
+    }
+
+    class Prepare implements Runnable {
+        @Override
+        public void run() {
+            try {
+                if (spec.targetMessagesPerSec() <= 0) {
+                    throw new ConfigException("Can't have targetMessagesPerSec <= 0.");
+                }
+                if ((spec.partitionAssignments() == null) || spec.partitionAssignments().isEmpty()) {
+                    throw new ConfigException("Invalid null or empty partitionAssignments.");
+                }
+                WorkerUtils.createTopics(log, spec.bootstrapServers(),
+                    Collections.singletonList(new NewTopic(TOPIC_NAME, spec.partitionAssignments())));
+                executor.submit(new ProducerRunnable());
+                executor.submit(new ConsumerRunnable());
+            } catch (Throwable e) {
+                WorkerUtils.abort(log, "Prepare", e, doneFuture);
+            }
+        }
+    }
+
+    private static class ToSendTrackerResult {
+        final int index;
+        final boolean firstSend;
+
+        ToSendTrackerResult(int index, boolean firstSend) {
+            this.index = index;
+            this.firstSend = firstSend;
+        }
+    }
+
+    private static class ToSendTracker {
+        private final int maxMessages;
+        private final List<Integer> failed = new ArrayList<>();
+        private int frontier = 0;
+
+        ToSendTracker(int maxMessages) {
+            this.maxMessages = maxMessages;
+        }
+
+        synchronized void addFailed(int index) {
+            failed.add(index);
+        }
+
+        synchronized ToSendTrackerResult next() {
+            if (failed.isEmpty()) {
+                if (frontier >= maxMessages) {
+                    return null;
+                } else {
+                    return new ToSendTrackerResult(frontier++, true);
+                }
+            } else {
+                return new ToSendTrackerResult(failed.remove(0), false);
+            }
+        }
+    }
+
+    class ProducerRunnable implements Runnable {
+        private final Throttle throttle;
+
+        ProducerRunnable() {
+            Properties props = new Properties();
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
+            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 4 * 16 * 1024L);
+            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000L);
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer." + id);
+            props.put(ProducerConfig.ACKS_CONFIG, "all");
+            props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
+            producer = new KafkaProducer<>(props, new StringSerializer(),
+                new ByteArraySerializer());
+            int perPeriod = WorkerUtils.
+                perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+            this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+        }
+
+        @Override
+        public void run() {
+            byte[] value = new byte[VALUE_SIZE];
+            final ToSendTracker toSendTracker = new ToSendTracker(spec.maxMessages());
+            long messagesSent = 0;
+            long uniqueMessagesSent = 0;
+            log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", id);
+            try {
+                while (true) {
+                    final ToSendTrackerResult result = toSendTracker.next();
+                    if (result == null) {
+                        break;
+                    }
+                    throttle.increment();
+                    final int messageIndex = result.index;
+                    if (result.firstSend) {
+                        toReceiveTracker.addPending(messageIndex);
+                        uniqueMessagesSent++;
+                    }
+                    messagesSent++;
+                    ProducerRecord<String, byte[]> record =
+                        new ProducerRecord<>(TOPIC_NAME, 0, String.valueOf(messageIndex), value);
+                    producer.send(record, new Callback() {
+                        @Override
+                        public void onCompletion(RecordMetadata metadata, Exception exception) {
+                            if (exception == null) {
+                                unackedSends.countDown();
+                            } else {
+                                log.info("{}: Got exception when sending message {}: {}",
+                                    id, messageIndex, exception.getMessage());
+                                toSendTracker.addFailed(messageIndex);
+                            }
+                        }
+                    });
+                }
+            } catch (Throwable e) {
+                WorkerUtils.abort(log, "ProducerRunnable", e, doneFuture);
+            } finally {
+                log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; " +
+                        "ackedSends={}.", id, messagesSent, uniqueMessagesSent,
+                        spec.maxMessages() - unackedSends.getCount());
+            }
+        }
+    }
+
+    private class ToReceiveTracker {
+        private final TreeSet<Integer> pending = new TreeSet<>();
+
+        synchronized void addPending(int messageIndex) {
+            pending.add(messageIndex);
+        }
+
+        synchronized boolean removePending(int messageIndex) {
+            return pending.remove(messageIndex);
+        }
+
+        void log() {
+            int numToReceive;
+            List<Integer> list = new ArrayList<>(LOG_NUM_MESSAGES);
+            synchronized (this) {
+                numToReceive = pending.size();
+                for (Iterator<Integer> iter = pending.iterator();
+                        iter.hasNext() && (list.size() < LOG_NUM_MESSAGES); ) {
+                    Integer i = iter.next();
+                    list.add(i);
+                }
+            }
+            log.info("{}: consumer waiting for {} message(s), starting with: {}",
+                id, numToReceive, Utils.join(list, ", "));
+        }
+    }
+
+    class ConsumerRunnable implements Runnable {
+        private final Properties props;
+
+        ConsumerRunnable() {
+            this.props = new Properties();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-1");
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
+            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+            consumer = new KafkaConsumer<>(props, new StringDeserializer(),
+                new ByteArrayDeserializer());
+            consumer.subscribe(Collections.singleton(TOPIC_NAME));
+        }
+
+        @Override
+        public void run() {
+            long uniqueMessagesReceived = 0;
+            long messagesReceived = 0;
+            long pollInvoked = 0;
+            log.debug("{}: Starting RoundTripWorker#ConsumerRunnable.", id);
+            try {
+                long lastLogTimeMs = Time.SYSTEM.milliseconds();
+                while (true) {
+                    try {
+                        pollInvoked++;
+                        ConsumerRecords<String, byte[]> records = consumer.poll(50);
+                        for (ConsumerRecord<String, byte[]> record : records.records(TOPIC_NAME)) {
+                            int messageIndex = Integer.parseInt(record.key());
+                            messagesReceived++;
+                            if (toReceiveTracker.removePending(messageIndex)) {
+                                uniqueMessagesReceived++;
+                                if (uniqueMessagesReceived >= spec.maxMessages()) {
+                                    log.info("{}: Consumer received the full count of {} unique messages.  " +
+                                        "Waiting for all sends to be acked...", id, spec.maxMessages());
+                                    unackedSends.await();
+                                    log.info("{}: all sends have been acked.", id);
+                                    doneFuture.complete("");
+                                    return;
+                                }
+                            }
+                        }
+                        long curTimeMs = Time.SYSTEM.milliseconds();
+                        if (curTimeMs > lastLogTimeMs + LOG_INTERVAL_MS) {
+                            toReceiveTracker.log();
+                            lastLogTimeMs = curTimeMs;
+                        }
+                    } catch (WakeupException e) {
+                        log.debug("{}: Consumer got WakeupException", id, e);
+                    } catch (TimeoutException e) {
+                        log.debug("{}: Consumer got TimeoutException", id, e);
+                    }
+                }
+            } catch (Throwable e) {
+                WorkerUtils.abort(log, "ConsumerRunnable", e, doneFuture);
+            } finally {
+                log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  " +
+                    "messagesReceived = {}; uniqueMessagesReceived = {}.",
+                    id, pollInvoked, messagesReceived, uniqueMessagesReceived);
+            }
+        }
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        if (!running.compareAndSet(true, false)) {
+            throw new IllegalStateException("ProduceBenchWorker is not running.");
+        }
+        log.info("{}: Deactivating RoundTripWorkloadWorker.", id);
+        doneFuture.complete("");
+        executor.shutdownNow();
+        executor.awaitTermination(1, TimeUnit.DAYS);
+        Utils.closeQuietly(consumer, "consumer");
+        Utils.closeQuietly(producer, "producer");
+        this.consumer = null;
+        this.producer = null;
+        this.unackedSends = null;
+        this.executor = null;
+        this.doneFuture = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
new file mode 100644
index 0000000..24c9e77
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Set;
+
+/**
+ * The specification for a workload that sends messages to a broker and then
+ * reads them back.
+ */
+public class RoundTripWorkloadSpec extends TaskSpec {
+    private final String clientNode;
+    private final String bootstrapServers;
+    private final int targetMessagesPerSec;
+    private final NavigableMap<Integer, List<Integer>> partitionAssignments;
+    private final int maxMessages;
+
+    @JsonCreator
+    public RoundTripWorkloadSpec(@JsonProperty("startMs") long startMs,
+             @JsonProperty("durationMs") long durationMs,
+             @JsonProperty("clientNode") String clientNode,
+             @JsonProperty("bootstrapServers") String bootstrapServers,
+             @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
+             @JsonProperty("partitionAssignments") NavigableMap<Integer, List<Integer>> partitionAssignments,
+             @JsonProperty("maxMessages") int maxMessages) {
+        super(startMs, durationMs);
+        this.clientNode = clientNode;
+        this.bootstrapServers = bootstrapServers;
+        this.targetMessagesPerSec = targetMessagesPerSec;
+        this.partitionAssignments = partitionAssignments;
+        this.maxMessages = maxMessages;
+    }
+
+    @JsonProperty
+    public String clientNode() {
+        return clientNode;
+    }
+
+    @JsonProperty
+    public String bootstrapServers() {
+        return bootstrapServers;
+    }
+
+    @JsonProperty
+    public int targetMessagesPerSec() {
+        return targetMessagesPerSec;
+    }
+
+    @JsonProperty
+    public NavigableMap<Integer, List<Integer>> partitionAssignments() {
+        return partitionAssignments;
+    }
+
+    @JsonProperty
+    public int maxMessages() {
+        return maxMessages;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return new TaskController() {
+            @Override
+            public Set<String> targetNodes(Topology topology) {
+                return Collections.singleton(clientNode);
+            }
+        };
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new RoundTripWorker(id, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/760d86a9/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
new file mode 100644
index 0000000..abd7e62
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TaskSpecTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testTaskSpecSerialization() throws Exception {
+        try {
+            JsonUtil.JSON_SERDE.readValue(
+                "{\"startMs\":123,\"durationMs\":456,\"exitMs\":1000,\"error\":\"foo\"}",
+                SampleTaskSpec.class);
+            fail("Expected InvalidTypeIdException because type id is missing.");
+        } catch (InvalidTypeIdException e) {
+        }
+        String inputJson = "{\"class\":\"org.apache.kafka.trogdor.task.SampleTaskSpec\"," +
+            "\"startMs\":123,\"durationMs\":456,\"exitMs\":1000,\"error\":\"foo\"}";
+        SampleTaskSpec spec = JsonUtil.JSON_SERDE.readValue(inputJson, SampleTaskSpec.class);
+        assertEquals(123, spec.startMs());
+        assertEquals(456, spec.durationMs());
+        assertEquals(1000, spec.exitMs());
+        assertEquals("foo", spec.error());
+        String outputJson = JsonUtil.toJsonString(spec);
+        assertEquals(inputJson, outputJson);
+    }
+};


Mime
View raw message