kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6688. The Trogdor coordinator should track task statuses (#4737)
Date Sun, 08 Apr 2018 08:35:43 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 40183e3  KAFKA-6688. The Trogdor coordinator should track task statuses (#4737)
40183e3 is described below

commit 40183e31567795d4d0f2b836294bc5d5fac2a56b
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Sun Apr 8 01:35:33 2018 -0700

    KAFKA-6688. The Trogdor coordinator should track task statuses (#4737)
    
    Reviewers: Anna Povzner <anna@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../apache/kafka/trogdor/agent/WorkerManager.java  |  10 +-
 .../kafka/trogdor/coordinator/NodeManager.java     |  47 +++----
 .../kafka/trogdor/coordinator/TaskManager.java     | 147 +++++++++++++--------
 .../kafka/trogdor/fault/KiboshFaultWorker.java     |  13 +-
 .../trogdor/fault/NetworkPartitionFaultWorker.java |  12 +-
 .../trogdor/fault/ProcessStopFaultWorker.java      |  12 +-
 .../org/apache/kafka/trogdor/rest/TaskDone.java    |   7 +-
 .../org/apache/kafka/trogdor/rest/TaskPending.java |   3 +-
 .../org/apache/kafka/trogdor/rest/TaskRunning.java |   6 +-
 .../org/apache/kafka/trogdor/rest/TaskState.java   |  12 +-
 .../apache/kafka/trogdor/rest/TaskStopping.java    |   6 +-
 .../org/apache/kafka/trogdor/rest/WorkerDone.java  |  10 +-
 .../apache/kafka/trogdor/rest/WorkerReceiving.java |   7 +
 .../apache/kafka/trogdor/rest/WorkerRunning.java   |  10 +-
 .../apache/kafka/trogdor/rest/WorkerStarting.java  |   7 +
 .../org/apache/kafka/trogdor/rest/WorkerState.java |   5 +-
 .../apache/kafka/trogdor/rest/WorkerStopping.java  |  10 +-
 .../AgentWorkerStatusTracker.java}                 |  29 ++--
 .../apache/kafka/trogdor/task/NoOpTaskWorker.java  |  10 +-
 .../org/apache/kafka/trogdor/task/TaskWorker.java  |   6 +-
 .../WorkerStatusTracker.java}                      |  20 +--
 .../kafka/trogdor/workload/ProduceBenchWorker.java |  52 +++++---
 .../kafka/trogdor/workload/RoundTripWorker.java    |   4 +-
 .../org/apache/kafka/trogdor/agent/AgentTest.java  |  57 ++++----
 .../trogdor/common/JsonSerializationTest.java      |   2 +-
 .../kafka/trogdor/coordinator/CoordinatorTest.java | 118 +++++++++++++++--
 .../apache/kafka/trogdor/task/SampleTaskSpec.java  |  15 ++-
 .../kafka/trogdor/task/SampleTaskWorker.java       |  15 ++-
 .../apache/kafka/trogdor/task/TaskSpecTest.java    |   4 +-
 29 files changed, 441 insertions(+), 215 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index cda7773..7c8de6d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.rest.WorkerDone;
@@ -29,6 +30,7 @@ import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStarting;
 import org.apache.kafka.trogdor.rest.WorkerStopping;
 import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 import org.slf4j.Logger;
@@ -43,7 +45,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 public final class WorkerManager {
     private static final Logger log = LoggerFactory.getLogger(WorkerManager.class);
@@ -190,7 +191,7 @@ public final class WorkerManager {
         /**
          * The worker status.
          */
-        private final AtomicReference<String> status = new AtomicReference<>("");
+        private final AgentWorkerStatusTracker status = new AgentWorkerStatusTracker();
 
         /**
          * The time when this task was started.
@@ -293,6 +294,8 @@ public final class WorkerManager {
             haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
                 @Override
                 public Void apply(String errorString) {
+                    if (errorString == null)
+                        errorString = "";
                     if (errorString.isEmpty()) {
                         log.info("{}: Worker {} is halting.", nodeName, id);
                     } else {
@@ -306,8 +309,9 @@ public final class WorkerManager {
             try {
                 worker.taskWorker.start(platform, worker.status, haltFuture);
             } catch (Exception e) {
+                log.info("{}: Worker {} start() exception", nodeName, id, e);
                 stateChangeExecutor.submit(new HandleWorkerHalting(worker,
-                    "worker.start() exception: " + e.getMessage(), true));
+                    "worker.start() exception: " + Utils.stackTrace(e), true));
             }
             stateChangeExecutor.submit(new FinishCreatingWorker(worker));
         }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 0129007..91ef9c2 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -49,7 +49,6 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerReceiving;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStarting;
@@ -192,6 +191,9 @@ public final class NodeManager {
                     // agents going down?
                     return;
                 }
+                if (log.isTraceEnabled()) {
+                    log.trace("{}: got heartbeat status {}", node.name(), agentStatus);
+                }
                 // Identify workers which we think should be running, but which do not appear
                 // in the agent's response.  We need to send startWorker requests for these.
                 for (Map.Entry<String, ManagedWorker> entry : workers.entrySet()) {
@@ -203,40 +205,31 @@ public final class NodeManager {
                         }
                     }
                 }
-                // Identify tasks which are running, but which we don't know about.
-                // Add these to the NodeManager as tasks that should not be running.
                 for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
                     String id = entry.getKey();
                     WorkerState state = entry.getValue();
-                    if (!workers.containsKey(id)) {
+                    ManagedWorker worker = workers.get(id);
+                    if (worker == null) {
+                        // Identify tasks which are running, but which we don't know about.
+                        // Add these to the NodeManager as tasks that should not be running.
                         log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
                         workers.put(id, new ManagedWorker(id, state.spec(), false, state));
-                    }
-                }
-                // Handle workers which need to be stopped.  Handle workers which have newly completed.
-                for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
-                    String id = entry.getKey();
-                    WorkerState state = entry.getValue();
-                    ManagedWorker worker = workers.get(id);
-                    if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
-                        if (!worker.shouldRun) {
-                            worker.tryStop();
-                        }
-                    } else if (state instanceof WorkerDone) {
-                        if (!(worker.state instanceof WorkerDone)) {
-                            WorkerDone workerDoneState =  (WorkerDone) state;
-                            String error = workerDoneState.error();
-                            if (error.isEmpty()) {
-                                log.info("{}: Worker {} finished with status '{}'",
-                                    node.name(), id, workerDoneState.status());
-                            } else {
-                                log.warn("{}: Worker {} finished with error '{}' and status '{}'",
-                                    node.name(), id, error, workerDoneState.status());
+                    } else {
+                        // Handle workers which need to be stopped.
+                        if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
+                            if (!worker.shouldRun) {
+                                worker.tryStop();
                             }
-                            taskManager.handleWorkerCompletion(node.name(), worker.id, error);
+                        }
+                        // Notify the TaskManager if the worker state has changed.
+                        if (worker.state.equals(state)) {
+                            log.debug("{}: worker state is still {}", node.name(), worker.state);
+                        } else {
+                            log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
+                            worker.state = state;
+                            taskManager.updateWorkerState(node.name(), worker.id, state);
                         }
                     }
-                    worker.state = state;
                 }
             } catch (Throwable e) {
                 log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index d88e1d5..7e19c8b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -17,10 +17,14 @@
 
 package org.apache.kafka.trogdor.coordinator;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
@@ -31,13 +35,15 @@ import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TaskStopping;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerReceiving;
+import org.apache.kafka.trogdor.rest.WorkerState;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -172,16 +178,9 @@ public final class TaskManager {
         private Future<?> startFuture = null;
 
         /**
-         * The name of the worker nodes involved with this task.
-         * Null if the task is not running.
+         * The states of the workers involved with this task.
          */
-        private Set<String> workers = null;
-
-        /**
-         * The names of the worker nodes which are still running this task.
-         * Null if the task is not running.
-         */
-        private Set<String> activeWorkers = null;
+        public Map<String, WorkerState> workerStates = new TreeMap<>();
 
         /**
          * If this is non-empty, a message describing how this task failed.
@@ -241,14 +240,39 @@ public final class TaskManager {
                 case PENDING:
                     return new TaskPending(spec);
                 case RUNNING:
-                    return new TaskRunning(spec, startedMs);
+                    return new TaskRunning(spec, startedMs, getCombinedStatus(workerStates));
                 case STOPPING:
-                    return new TaskStopping(spec, startedMs);
+                    return new TaskStopping(spec, startedMs, getCombinedStatus(workerStates));
                 case DONE:
-                    return new TaskDone(spec, startedMs, doneMs, error, cancelled);
+                    return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus(workerStates));
             }
             throw new RuntimeException("unreachable");
         }
+
+        TreeSet<String> activeWorkers() {
+            TreeSet<String> workerNames = new TreeSet<>();
+            for (Map.Entry<String, WorkerState> entry : workerStates.entrySet()) {
+                if (!entry.getValue().done()) {
+                    workerNames.add(entry.getKey());
+                }
+            }
+            return workerNames;
+        }
+    }
+
+    private static final JsonNode getCombinedStatus(Map<String, WorkerState> states) {
+        if (states.size() == 1) {
+            return states.values().iterator().next().status();
+        } else {
+            ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
+            for (Map.Entry<String, WorkerState> entry : states.entrySet()) {
+                JsonNode node = entry.getValue().status();
+                if (node != null) {
+                    objectNode.set(entry.getKey(), node);
+                }
+            }
+            return objectNode;
+        }
     }
 
     /**
@@ -349,10 +373,8 @@ public final class TaskManager {
             log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
             task.state = ManagedTaskState.RUNNING;
             task.startedMs = time.milliseconds();
-            task.workers = nodeNames;
-            task.activeWorkers = new HashSet<>();
-            for (String workerName : task.workers) {
-                task.activeWorkers.add(workerName);
+            for (String workerName : nodeNames) {
+                task.workerStates.put(workerName, new WorkerReceiving(task.spec));
                 nodeManagers.get(workerName).createWorker(task.id, task.spec);
             }
             return null;
@@ -398,15 +420,16 @@ public final class TaskManager {
                     break;
                 case RUNNING:
                     task.cancelled = true;
-                    if (task.activeWorkers.size() == 0) {
+                    TreeSet<String> activeWorkers = task.activeWorkers();
+                    if (activeWorkers.isEmpty()) {
                         log.info("Task {} is now complete with error: {}", id, task.error);
                         task.doneMs = time.milliseconds();
                         task.state = ManagedTaskState.DONE;
                     } else {
-                        for (String workerName : task.activeWorkers) {
+                        for (String workerName : activeWorkers) {
                             nodeManagers.get(workerName).stopWorker(id);
                         }
-                        log.info("Cancelling task {} on worker(s): {}", id, Utils.join(task.activeWorkers, ", "));
+                        log.info("Cancelling task {} on worker(s): {}", id, Utils.join(activeWorkers, ", "));
                         task.state = ManagedTaskState.STOPPING;
                     }
                     break;
@@ -422,66 +445,80 @@ public final class TaskManager {
     }
 
     /**
-     * A callback NodeManager makes to indicate that a worker has completed.
-     * The task will transition to DONE once all workers are done.
+     * Update the state of a particular agent's worker.
      *
-     * @param nodeName      The node name.
+     * @param nodeName      The node where the agent is running.
      * @param id            The worker name.
-     * @param error         An empty string if there is no error, or an error string.
+     * @param state         The worker state.
      */
-    public void handleWorkerCompletion(String nodeName, String id, String error) {
-        executor.submit(new HandleWorkerCompletion(nodeName, id, error));
+    public void updateWorkerState(String nodeName, String id, WorkerState state) {
+        executor.submit(new UpdateWorkerState(nodeName, id, state));
     }
 
-    class HandleWorkerCompletion implements Callable<Void> {
+    class UpdateWorkerState implements Callable<Void> {
         private final String nodeName;
         private final String id;
-        private final String error;
+        private final WorkerState state;
 
-        HandleWorkerCompletion(String nodeName, String id, String error) {
+        UpdateWorkerState(String nodeName, String id, WorkerState state) {
             this.nodeName = nodeName;
             this.id = id;
-            this.error = error;
+            this.state = state;
         }
 
         @Override
         public Void call() throws Exception {
             ManagedTask task = tasks.get(id);
             if (task == null) {
-                log.error("Can't handle completion of unknown worker {} on node {}",
+                log.error("Can't update worker state unknown worker {} on node {}",
                     id, nodeName);
                 return null;
             }
-            if ((task.state == ManagedTaskState.PENDING) || (task.state == ManagedTaskState.DONE)) {
-                log.error("Task {} got unexpected worker completion from {} while " +
-                    "in {} state.", id, nodeName, task.state);
-                return null;
-            }
-            boolean broadcastStop = false;
-            if (task.state == ManagedTaskState.RUNNING) {
-                task.state = ManagedTaskState.STOPPING;
-                broadcastStop = true;
-            }
-            task.maybeSetError(error);
-            task.activeWorkers.remove(nodeName);
-            if (task.activeWorkers.size() == 0) {
-                task.doneMs = time.milliseconds();
-                task.state = ManagedTaskState.DONE;
-                log.info("Task {} is now complete on {} with error: {}", id,
-                    Utils.join(task.workers, ", "),
-                    task.error.isEmpty() ? "(none)" : task.error);
-            } else if (broadcastStop) {
-                log.info("Node {} stopped.  Stopping task {} on worker(s): {}",
-                    id, Utils.join(task.activeWorkers, ", "));
-                for (String workerName : task.activeWorkers) {
-                    nodeManagers.get(workerName).stopWorker(id);
-                }
+            WorkerState prevState = task.workerStates.get(nodeName);
+            log.debug("Task {}: Updating worker state for {} from {} to {}.",
+                id, nodeName, prevState, state);
+            task.workerStates.put(nodeName, state);
+            if (state.done() && (!prevState.done())) {
+                handleWorkerCompletion(task, nodeName, (WorkerDone) state);
             }
             return null;
         }
     }
 
     /**
+     * Handle a worker being completed.
+     *
+     * @param task      The task that owns the worker.
+     * @param nodeName  The name of the node on which the worker is running.
+     * @param state     The worker state.
+     */
+    private void handleWorkerCompletion(ManagedTask task, String nodeName, WorkerDone state) {
+        if (state.error().isEmpty()) {
+            log.info("{}: Worker {} finished with status '{}'",
+                nodeName, task.id, JsonUtil.toJsonString(state.status()));
+        } else {
+            log.warn("{}: Worker {} finished with error '{}' and status '{}'",
+                nodeName, task.id, state.error(), JsonUtil.toJsonString(state.status()));
+            task.maybeSetError(state.error());
+        }
+        if (task.activeWorkers().isEmpty()) {
+            task.doneMs = time.milliseconds();
+            task.state = ManagedTaskState.DONE;
+            log.info("{}: Task {} is now complete on {} with error: {}",
+                nodeName, task.id, Utils.join(task.workerStates.keySet(), ", "),
+                task.error.isEmpty() ? "(none)" : task.error);
+        } else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
+            TreeSet<String> activeWorkers = task.activeWorkers();
+            log.info("{}: task {} stopped with error {}.  Stopping worker(s): {}",
+                nodeName, task.id, task.error, Utils.join(activeWorkers, ", "));
+            task.state = ManagedTaskState.STOPPING;
+            for (String workerName : activeWorkers) {
+                nodeManagers.get(workerName).stopWorker(task.id);
+            }
+        }
+    }
+
+    /**
      * Get information about the tasks being managed.
      */
     public TasksResponse tasks(TasksRequest request) throws ExecutionException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
index 629d15e..97934a8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
@@ -17,15 +17,15 @@
 
 package org.apache.kafka.trogdor.fault;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 public class KiboshFaultWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(KiboshFaultWorker.class);
 
@@ -35,6 +35,8 @@ public class KiboshFaultWorker implements TaskWorker {
 
     private final String mountPath;
 
+    private WorkerStatusTracker status;
+
     public KiboshFaultWorker(String id, KiboshFaultSpec spec, String mountPath) {
         this.id = id;
         this.spec = spec;
@@ -42,15 +44,20 @@ public class KiboshFaultWorker implements TaskWorker {
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
         log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+        this.status = status;
+        this.status.update(new TextNode("Adding fault " + id));
         Kibosh.INSTANCE.addFault(mountPath, spec);
+        this.status.update(new TextNode("Added fault " + id));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+        this.status.update(new TextNode("Removing fault " + id));
         Kibosh.INSTANCE.removeFault(mountPath, spec);
+        this.status.update(new TextNode("Removed fault " + id));
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
index 787c5e0..1b99a93 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
@@ -17,11 +17,13 @@
 
 package org.apache.kafka.trogdor.fault;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.Topology;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,7 +31,6 @@ import java.net.InetAddress;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class NetworkPartitionFaultWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFaultWorker.class);
@@ -38,22 +39,29 @@ public class NetworkPartitionFaultWorker implements TaskWorker {
 
     private final List<Set<String>> partitionSets;
 
+    private WorkerStatusTracker status;
+
     public NetworkPartitionFaultWorker(String id, List<Set<String>> partitionSets) {
         this.id = id;
         this.partitionSets = partitionSets;
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
         log.info("Activating NetworkPartitionFault {}.", id);
+        this.status = status;
+        this.status.update(new TextNode("creating network partition " + id));
         runIptablesCommands(platform, "-A");
+        this.status.update(new TextNode("created network partition " + id));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("Deactivating NetworkPartitionFault {}.", id);
+        this.status.update(new TextNode("removing network partition " + id));
         runIptablesCommands(platform, "-D");
+        this.status.update(new TextNode("removed network partition " + id));
     }
 
     private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
index 66a8c6e..d30eaf7 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
@@ -17,16 +17,17 @@
 
 package org.apache.kafka.trogdor.fault;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class ProcessStopFaultWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class);
@@ -35,22 +36,29 @@ public class ProcessStopFaultWorker implements TaskWorker {
 
     private final String javaProcessName;
 
+    private WorkerStatusTracker status;
+
     public ProcessStopFaultWorker(String id, String javaProcessName) {
         this.id = id;
         this.javaProcessName = javaProcessName;
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
+        this.status = status;
         log.info("Activating ProcessStopFault {}.", id);
+        this.status.update(new TextNode("stopping " + javaProcessName));
         sendSignals(platform, "SIGSTOP");
+        this.status.update(new TextNode("stopped " + javaProcessName));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("Deactivating ProcessStopFault {}.", id);
+        this.status.update(new TextNode("resuming " + javaProcessName));
         sendSignals(platform, "SIGCONT");
+        this.status.update(new TextNode("resumed " + javaProcessName));
     }
 
     private void sendSignals(Platform platform, String signalName) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
index 536d3f2..e8d6003 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
@@ -16,9 +16,9 @@
  */
 
 package org.apache.kafka.trogdor.rest;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -50,8 +50,9 @@ public class TaskDone extends TaskState {
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("doneMs") long doneMs,
             @JsonProperty("error") String error,
-            @JsonProperty("cancelled") boolean cancelled) {
-        super(spec);
+            @JsonProperty("cancelled") boolean cancelled,
+            @JsonProperty("status") JsonNode status) {
+        super(spec, status);
         this.startedMs = startedMs;
         this.doneMs = doneMs;
         this.error = error;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
index b0162d3..7831301 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -27,6 +28,6 @@ import org.apache.kafka.trogdor.task.TaskSpec;
 public class TaskPending extends TaskState {
     @JsonCreator
     public TaskPending(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
+        super(spec, NullNode.instance);
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
index bff3676..7a81bdf 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -32,8 +33,9 @@ public class TaskRunning extends TaskState {
 
     @JsonCreator
     public TaskRunning(@JsonProperty("spec") TaskSpec spec,
-            @JsonProperty("startedMs") long startedMs) {
-        super(spec);
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("status") JsonNode status) {
+        super(spec, status);
         this.startedMs = startedMs;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 28b6108..0764e14 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -20,6 +20,8 @@ package org.apache.kafka.trogdor.rest;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -37,12 +39,20 @@ import org.apache.kafka.trogdor.task.TaskSpec;
 public abstract class TaskState extends Message {
     private final TaskSpec spec;
 
-    public TaskState(TaskSpec spec) {
+    private final JsonNode status;
+
+    public TaskState(TaskSpec spec, JsonNode status) {
         this.spec = spec;
+        this.status = status == null ? NullNode.instance : status;
     }
 
     @JsonProperty
     public TaskSpec spec() {
         return spec;
     }
+
+    @JsonProperty
+    public JsonNode status() {
+        return status;
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
index 4446b75..d40b43c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -32,8 +33,9 @@ public class TaskStopping extends TaskState {
 
     @JsonCreator
     public TaskStopping(@JsonProperty("spec") TaskSpec spec,
-            @JsonProperty("startedMs") long startedMs) {
-        super(spec);
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("status") JsonNode status) {
+        super(spec, status);
         this.startedMs = startedMs;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
index e463ffc..500d3c6 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -39,7 +41,7 @@ public class WorkerDone extends WorkerState {
      * The task status.  The format will depend on the type of task that is
      * being run.
      */
-    private final String status;
+    private final JsonNode status;
 
     /**
      * Empty if the task completed without error; the error message otherwise.
@@ -50,12 +52,12 @@ public class WorkerDone extends WorkerState {
     public WorkerDone(@JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("doneMs") long doneMs,
-            @JsonProperty("status") String status,
+            @JsonProperty("status") JsonNode status,
             @JsonProperty("error") String error) {
         super(spec);
         this.startedMs = startedMs;
         this.doneMs = doneMs;
-        this.status = status == null ? "" : status;
+        this.status = status == null ? NullNode.instance : status;
         this.error = error == null ? "" : error;
     }
 
@@ -72,7 +74,7 @@ public class WorkerDone extends WorkerState {
 
     @JsonProperty
     @Override
-    public String status() {
+    public JsonNode status() {
         return status;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
index d3e3565..7068774 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -30,4 +32,9 @@ public final class WorkerReceiving extends WorkerState {
     public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
         super(spec);
     }
+
+    @Override
+    public JsonNode status() {
+        return new TextNode("receiving");
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
index e3b8d19..af8ee88 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -34,15 +36,15 @@ public class WorkerRunning extends WorkerState {
      * The task status.  The format will depend on the type of task that is
      * being run.
      */
-    private final String status;
+    private final JsonNode status;
 
     @JsonCreator
     public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
-            @JsonProperty("status") String status) {
+            @JsonProperty("status") JsonNode status) {
         super(spec);
         this.startedMs = startedMs;
-        this.status = status == null ? "" : status;
+        this.status = status == null ? NullNode.instance : status;
     }
 
     @JsonProperty
@@ -53,7 +55,7 @@ public class WorkerRunning extends WorkerState {
 
     @JsonProperty
     @Override
-    public String status() {
+    public JsonNode status() {
         return status;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
index 3a766ea..b568ec1 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -29,4 +31,9 @@ public final class WorkerStarting extends WorkerState {
     public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
         super(spec);
     }
+
+    @Override
+    public JsonNode status() {
+        return new TextNode("starting");
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
index 6d7c687..044d719 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
@@ -20,6 +20,7 @@ package org.apache.kafka.trogdor.rest;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
@@ -60,9 +61,7 @@ public abstract class WorkerState extends Message {
         throw new KafkaException("invalid state");
     }
 
-    public String status() {
-        throw new KafkaException("invalid state");
-    }
+    public abstract JsonNode status();
 
     public boolean running() {
         return false;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
index 777e511..9fbb3ff 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
@@ -34,15 +36,15 @@ public class WorkerStopping extends WorkerState {
      * The task status.  The format will depend on the type of task that is
      * being run.
      */
-    private final String status;
+    private final JsonNode status;
 
     @JsonCreator
     public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
-            @JsonProperty("status") String status) {
+            @JsonProperty("status") JsonNode status) {
         super(spec);
         this.startedMs = startedMs;
-        this.status = status == null ? "" : status;
+        this.status = status == null ? NullNode.instance : status;
     }
 
     @JsonProperty
@@ -53,7 +55,7 @@ public class WorkerStopping extends WorkerState {
 
     @JsonProperty
     @Override
-    public String status() {
+    public JsonNode status() {
         return status;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
similarity index 57%
copy from tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
copy to tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
index 3a766ea..2ad8e4e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
@@ -15,18 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.trogdor.rest;
+package org.apache.kafka.trogdor.task;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
 
 /**
- * When we have just started a worker.
+ * Tracks the status of a Trogdor worker.
  */
-public final class WorkerStarting extends WorkerState {
-    @JsonCreator
-    public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
+public class AgentWorkerStatusTracker implements WorkerStatusTracker {
+    private JsonNode status = NullNode.instance;
+
+    @Override
+    public void update(JsonNode newStatus) {
+        JsonNode status = newStatus.deepCopy();
+        synchronized (this) {
+            this.status = status;
+        }
+    }
+
+    /**
+     * Retrieves the status.
+     */
+    public synchronized JsonNode get() {
+        return status;
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
index dfa8084..77336d8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
@@ -17,30 +17,34 @@
 
 package org.apache.kafka.trogdor.task;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 public class NoOpTaskWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(NoOpTaskWorker.class);
 
     private final String id;
 
+    private WorkerStatusTracker status;
+
     public NoOpTaskWorker(String id) {
         this.id = id;
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> errorFuture) throws Exception {
         log.info("{}: Activating NoOpTask.", id);
+        this.status = status;
+        this.status.update(new TextNode("active"));
     }
 
     @Override
     public void stop(Platform platform) throws Exception {
         log.info("{}: Deactivating NoOpTask.", id);
+        this.status.update(new TextNode("done"));
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
index 288eb9c..042568f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
@@ -20,8 +20,6 @@ package org.apache.kafka.trogdor.task;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * The agent-side interface for implementing tasks.
  */
@@ -42,7 +40,7 @@ public interface TaskWorker {
      *
      *
      * @param platform          The platform to use.
-     * @param status            The current status string.  The TaskWorker can update
+     * @param status            The current status.  The TaskWorker can update
      *                          this at any time to provide an updated status.
      * @param haltFuture        A future which the worker should complete if it halts.
      *                          If it is completed with an empty string, that means the task
@@ -53,7 +51,7 @@ public interface TaskWorker {
      *
      * @throws Exception        If the TaskWorker failed to start.  stop() will not be invoked.
      */
-    void start(Platform platform, AtomicReference<String> status, KafkaFutureImpl<String> haltFuture)
+    void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> haltFuture)
         throws Exception;
 
     /**
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
similarity index 67%
copy from tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
copy to tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
index b0162d3..dfbc7ea 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.trogdor.rest;
+package org.apache.kafka.trogdor.task;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
+import com.fasterxml.jackson.databind.JsonNode;
 
 /**
- * The state for a task which is still pending.
+ * Tracks the status of a Trogdor worker.
  */
-public class TaskPending extends TaskState {
-    @JsonCreator
-    public TaskPending(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
-    }
+public interface WorkerStatusTracker {
+    /**
+     * Updates the status.
+     *
+     * @param status    The new status.
+     */
+    void update(JsonNode status);
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index a891b83..4c3095f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -34,6 +35,7 @@ import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.common.WorkerUtils;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +48,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class ProduceBenchWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
@@ -61,7 +62,7 @@ public class ProduceBenchWorker implements TaskWorker {
 
     private ScheduledExecutorService executor;
 
-    private AtomicReference<String> status;
+    private WorkerStatusTracker status;
 
     private KafkaFutureImpl<String> doneFuture;
 
@@ -81,7 +82,7 @@ public class ProduceBenchWorker implements TaskWorker {
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> doneFuture) throws Exception {
         if (!running.compareAndSet(false, true)) {
             throw new IllegalStateException("ProducerBenchWorker is already running.");
@@ -112,9 +113,10 @@ public class ProduceBenchWorker implements TaskWorker {
                     newTopics.put(name, new NewTopic(name, spec.numPartitions(),
                                                      spec.replicationFactor()));
                 }
+                status.update(new TextNode("Creating " + spec.totalTopics() + " topic(s)"));
                 WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
                                          spec.adminClientConf(), newTopics, false);
-
+                status.update(new TextNode("Created " + spec.totalTopics() + " topic(s)"));
                 executor.submit(new SendRecords());
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
@@ -181,7 +183,7 @@ public class ProduceBenchWorker implements TaskWorker {
             this.histogram = new Histogram(5000);
             int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
-                new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
+                new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
             Properties props = new Properties();
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
             // add common client configs to producer properties, and then user-specified producer
@@ -218,10 +220,10 @@ public class ProduceBenchWorker implements TaskWorker {
                 WorkerUtils.abort(log, "SendRecords", e, doneFuture);
             } finally {
                 statusUpdaterFuture.cancel(false);
-                new StatusUpdater(histogram).run();
+                StatusData statusData = new StatusUpdater(histogram).update();
                 long curTimeMs = Time.SYSTEM.milliseconds();
                 log.info("Sent {} total record(s) in {} ms.  status: {}",
-                    histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
+                    histogram.summarize().numSamples(), curTimeMs - startTimeMs, statusData);
             }
             doneFuture.complete("");
             return null;
@@ -234,46 +236,54 @@ public class ProduceBenchWorker implements TaskWorker {
 
     public class StatusUpdater implements Runnable {
         private final Histogram histogram;
-        private final float[] percentiles;
 
         StatusUpdater(Histogram histogram) {
             this.histogram = histogram;
-            this.percentiles = new float[] {0.50f, 0.95f, 0.99f};
         }
 
         @Override
         public void run() {
             try {
-                Histogram.Summary summary = histogram.summarize(percentiles);
-                StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
-                    summary.percentiles().get(0).value(),
-                    summary.percentiles().get(1).value(),
-                    summary.percentiles().get(2).value());
-                String statusDataString = JsonUtil.toJsonString(statusData);
-                status.set(statusDataString);
+                update();
             } catch (Exception e) {
                 WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
             }
         }
+
+        StatusData update() {
+            Histogram.Summary summary = histogram.summarize(StatusData.PERCENTILES);
+            StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
+                summary.percentiles().get(0).value(),
+                summary.percentiles().get(1).value(),
+                summary.percentiles().get(2).value());
+            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+            return statusData;
+        }
     }
 
     public static class StatusData {
         private final long totalSent;
         private final float averageLatencyMs;
         private final int p50LatencyMs;
-        private final int p90LatencyMs;
+        private final int p95LatencyMs;
         private final int p99LatencyMs;
 
+        /**
+         * The percentiles to use when calculating the histogram data.
+         * These should match up with the p50LatencyMs, p95LatencyMs, etc. fields.
+         */
+        final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
         @JsonCreator
         StatusData(@JsonProperty("totalSent") long totalSent,
                    @JsonProperty("averageLatencyMs") float averageLatencyMs,
                    @JsonProperty("p50LatencyMs") int p50latencyMs,
-                   @JsonProperty("p90LatencyMs") int p90latencyMs,
+                   @JsonProperty("p95LatencyMs") int p95latencyMs,
                    @JsonProperty("p99LatencyMs") int p99latencyMs) {
             this.totalSent = totalSent;
             this.averageLatencyMs = averageLatencyMs;
             this.p50LatencyMs = p50latencyMs;
-            this.p90LatencyMs = p90latencyMs;
+            this.p95LatencyMs = p95latencyMs;
             this.p99LatencyMs = p99latencyMs;
         }
 
@@ -293,8 +303,8 @@ public class ProduceBenchWorker implements TaskWorker {
         }
 
         @JsonProperty
-        public int p90LatencyMs() {
-            return p90LatencyMs;
+        public int p95LatencyMs() {
+            return p95LatencyMs;
         }
 
         @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 08b11ac..12b0c08 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -39,6 +39,7 @@ import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.common.WorkerUtils;
 import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +56,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class RoundTripWorker implements TaskWorker {
     private static final int THROTTLE_PERIOD_MS = 100;
@@ -98,7 +98,7 @@ public class RoundTripWorker implements TaskWorker {
     }
 
     @Override
-    public void start(Platform platform, AtomicReference<String> status,
+    public void start(Platform platform, WorkerStatusTracker status,
                       KafkaFutureImpl<String> doneFuture) throws Exception {
         if (!running.compareAndSet(false, true)) {
             throw new IllegalStateException("RoundTripWorker is already running.");
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 30d13b5..61de5c9 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.trogdor.agent;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Scheduler;
@@ -122,7 +123,7 @@ public class AgentTest {
         CreateWorkerResponse response = client.createWorker(new CreateWorkerRequest("foo", fooSpec));
         assertEquals(fooSpec.toString(), response.spec().toString());
         new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -131,10 +132,10 @@ public class AgentTest {
         client.createWorker(new CreateWorkerRequest("bar", barSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, "")).
+                workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -142,13 +143,13 @@ public class AgentTest {
         client.createWorker(new CreateWorkerRequest("baz", bazSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, "")).
+                workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("baz").
-                workerState(new WorkerRunning(bazSpec, 0, "")).
+                workerState(new WorkerRunning(bazSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -169,7 +170,7 @@ public class AgentTest {
         client.createWorker(new CreateWorkerRequest("foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -179,10 +180,10 @@ public class AgentTest {
         client.createWorker(new CreateWorkerRequest("bar", barSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, "")).
+                workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -190,10 +191,10 @@ public class AgentTest {
 
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, "")).
+                workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -201,10 +202,10 @@ public class AgentTest {
         client.stopWorker(new StopWorkerRequest("bar"));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 1, 7, "", "")).
+                workerState(new WorkerDone(barSpec, 1, 7, new TextNode("done"), "")).
                 build()).
             waitFor(client);
 
@@ -221,34 +222,40 @@ public class AgentTest {
             maxTries(10).target("localhost", agent.port()).build();
         new ExpectedTasks().waitFor(client);
 
-        SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, 1, "");
+        SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
+            Collections.singletonMap("node01", 1L), "");
         client.createWorker(new CreateWorkerRequest("foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, "")).
+                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
-        SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000, 2, "baz");
+        SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
+            Collections.singletonMap("node01", 2L), "baz");
         client.createWorker(new CreateWorkerRequest("bar", barSpec));
 
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 1,
+                    new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, "")).
+                workerState(new WorkerRunning(barSpec, 0,
+                    new TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+                workerState(new WorkerDone(fooSpec, 0, 1,
+                    new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 0, 2, "", "baz")).
+                workerState(new WorkerDone(barSpec, 0, 2,
+                    new TextNode("halted"), "baz")).
                 build()).
             waitFor(client);
     }
@@ -289,7 +296,7 @@ public class AgentTest {
             client.createWorker(new CreateWorkerRequest("foo", fooSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, "")).
+                    workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).
                     build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
@@ -299,9 +306,9 @@ public class AgentTest {
             client.createWorker(new CreateWorkerRequest("bar", barSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, "")).build()).
+                    workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, "")).build()).
+                    workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{
                     add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
@@ -311,9 +318,9 @@ public class AgentTest {
             client.stopWorker(new StopWorkerRequest("foo"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerDone(fooSpec, 0, 1, "", "")).build()).
+                    workerState(new WorkerDone(fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, "")).build()).
+                    workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
                 new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 76b206b..8101d9c 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -52,7 +52,7 @@ public class JsonSerializationTest {
             0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
             0, null, null, 0));
-        verify(new SampleTaskSpec(0, 0, 0, null));
+        verify(new SampleTaskSpec(0, 0, null, null));
     }
 
     private <T> void verify(T val1) throws Exception {
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 004702f..34d7ffe 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.trogdor.coordinator;
 
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Scheduler;
@@ -41,6 +44,7 @@ import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.task.NoOpTaskSpec;
+import org.apache.kafka.trogdor.task.SampleTaskSpec;
 import org.junit.Rule;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
@@ -49,6 +53,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -94,8 +99,8 @@ public class CoordinatorTest {
             time.sleep(2);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 2)).
-                    workerState(new WorkerRunning(fooSpec, 2, "")).
+                    taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
+                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
                     build()).
                 waitFor(cluster.coordinatorClient()).
                 waitFor(cluster.agentClient("node02"));
@@ -103,7 +108,7 @@ public class CoordinatorTest {
             time.sleep(3);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 2, 5, "", false)).
+                    taskState(new TaskDone(fooSpec, 2, 5, "", false, new TextNode("done"))).
                     build()).
                 waitFor(cluster.coordinatorClient());
         }
@@ -131,26 +136,34 @@ public class CoordinatorTest {
             NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
             coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
             new ExpectedTasks().
-                addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
+                addTask(new ExpectedTaskBuilder("foo").taskState(
+                    new TaskPending(fooSpec)).build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
             time.sleep(11);
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node01", new TextNode("active"));
+            status1.set("node02", new TextNode("active"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 11)).
-                    workerState(new WorkerRunning(fooSpec, 11, "")).
+                    taskState(new TaskRunning(fooSpec, 11, status1)).
+                    workerState(new WorkerRunning(fooSpec, 11,  new TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
             time.sleep(2);
+            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+            status2.set("node01", new TextNode("done"));
+            status2.set("node02", new TextNode("done"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 11, 13, "", false)).
-                    workerState(new WorkerDone(fooSpec, 11, 13, "", "")).
+                    taskState(new TaskDone(fooSpec, 11, 13,
+                        "", false, status2)).
+                    workerState(new WorkerDone(fooSpec, 11, 13, new TextNode("done"), "")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -186,21 +199,29 @@ public class CoordinatorTest {
                 waitFor(agentClient2);
 
             time.sleep(11);
+
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node01", new TextNode("active"));
+            status1.set("node02", new TextNode("active"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 11)).
-                    workerState(new WorkerRunning(fooSpec, 11, "")).
+                    taskState(new TaskRunning(fooSpec, 11, status1)).
+                    workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
+            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+            status2.set("node01", new TextNode("done"));
+            status2.set("node02", new TextNode("done"));
             time.sleep(1);
             coordinatorClient.stopTask(new StopTaskRequest("foo"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskDone(fooSpec, 11, 12, "", true)).
-                    workerState(new WorkerDone(fooSpec, 11, 12, "", "")).
+                    taskState(new TaskDone(fooSpec, 11, 12, "",
+                        true, status2)).
+                    workerState(new WorkerDone(fooSpec, 11, 12, new TextNode("done"), "")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -375,8 +396,8 @@ public class CoordinatorTest {
             time.sleep(2);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    taskState(new TaskRunning(fooSpec, 2)).
-                    workerState(new WorkerRunning(fooSpec, 2, "")).
+                    taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
+                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
                     build()).
                 addTask(new ExpectedTaskBuilder("bar").
                     taskState(new TaskPending(barSpec)).
@@ -394,4 +415,73 @@ public class CoordinatorTest {
                 new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
         }
     }
+
+    @Test
+    public void testWorkersExitingAtDifferentTimes() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            addAgent("node03").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            new ExpectedTasks().waitFor(coordinatorClient);
+
+            HashMap<String, Long> nodeToExitMs = new HashMap<>();
+            nodeToExitMs.put("node02", 10L);
+            nodeToExitMs.put("node03", 20L);
+            SampleTaskSpec fooSpec =
+                new SampleTaskSpec(2, 100, nodeToExitMs, "");
+            coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskPending(fooSpec)).
+                    build()).
+                waitFor(coordinatorClient);
+
+            time.sleep(2);
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node02", new TextNode("active"));
+            status1.set("node03", new TextNode("active"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, status1)).
+                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02")).
+                waitFor(cluster.agentClient("node03"));
+
+            time.sleep(10);
+            ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+            status2.set("node02", new TextNode("halted"));
+            status2.set("node03", new TextNode("active"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, status2)).
+                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node03"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, status2)).
+                    workerState(new WorkerDone(fooSpec, 2, 12, new TextNode("halted"), "")).
+                    build()).
+                waitFor(cluster.agentClient("node02"));
+
+            time.sleep(10);
+            ObjectNode status3 = new ObjectNode(JsonNodeFactory.instance);
+            status3.set("node02", new TextNode("halted"));
+            status3.set("node03", new TextNode("halted"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskDone(fooSpec, 2, 22, "",
+                        false, status3)).
+                    build()).
+                waitFor(coordinatorClient);
+        }
+    }
 };
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
index 26fdfb2..38a160f 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
@@ -20,23 +20,28 @@ package org.apache.kafka.trogdor.task;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 public class SampleTaskSpec extends TaskSpec {
-    private final long exitMs;
+    private final Map<String, Long> nodeToExitMs;
     private final String error;
 
     @JsonCreator
     public SampleTaskSpec(@JsonProperty("startMs") long startMs,
                         @JsonProperty("durationMs") long durationMs,
-                        @JsonProperty("exitMs") long exitMs,
+                        @JsonProperty("nodeToExitMs") Map<String, Long> nodeToExitMs,
                         @JsonProperty("error") String error) {
         super(startMs, durationMs);
-        this.exitMs = exitMs;
+        this.nodeToExitMs = nodeToExitMs == null ? new HashMap<String, Long>() :
+            Collections.unmodifiableMap(nodeToExitMs);
         this.error = error == null ? "" : error;
     }
 
     @JsonProperty
-    public long exitMs() {
-        return exitMs;
+    public Map<String, Long> nodeToExitMs() {
+        return nodeToExitMs;
     }
 
     @JsonProperty
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
index ebac27e..ade055d 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.trogdor.task;
 
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
@@ -26,12 +27,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class SampleTaskWorker implements TaskWorker {
     private final SampleTaskSpec spec;
     private final ScheduledExecutorService executor;
     private Future<Void> future;
+    private WorkerStatusTracker status;
 
     SampleTaskWorker(SampleTaskSpec spec) {
         this.spec = spec;
@@ -41,17 +42,24 @@ public class SampleTaskWorker implements TaskWorker {
     }
 
     @Override
-    public synchronized void start(Platform platform, AtomicReference<String> status,
+    public synchronized void start(Platform platform, WorkerStatusTracker status,
                       final KafkaFutureImpl<String> haltFuture) throws Exception {
         if (this.future != null)
             return;
+        this.status = status;
+        this.status.update(new TextNode("active"));
+
+        Long exitMs = spec.nodeToExitMs().get(platform.curNode().name());
+        if (exitMs == null) {
+            exitMs = Long.MAX_VALUE;
+        }
         this.future = platform.scheduler().schedule(executor, new Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 haltFuture.complete(spec.error());
                 return null;
             }
-        }, spec.exitMs());
+        }, exitMs);
     }
 
     @Override
@@ -59,5 +67,6 @@ public class SampleTaskWorker implements TaskWorker {
         this.future.cancel(false);
         this.executor.shutdown();
         this.executor.awaitTermination(1, TimeUnit.DAYS);
+        this.status.update(new TextNode("halted"));
     }
 };
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
index abd7e62..d8d4ca9 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
@@ -40,11 +40,11 @@ public class TaskSpecTest {
         } catch (InvalidTypeIdException e) {
         }
         String inputJson = "{\"class\":\"org.apache.kafka.trogdor.task.SampleTaskSpec\"," +
-            "\"startMs\":123,\"durationMs\":456,\"exitMs\":1000,\"error\":\"foo\"}";
+            "\"startMs\":123,\"durationMs\":456,\"nodeToExitMs\":{\"node01\":1000},\"error\":\"foo\"}";
         SampleTaskSpec spec = JsonUtil.JSON_SERDE.readValue(inputJson, SampleTaskSpec.class);
         assertEquals(123, spec.startMs());
         assertEquals(456, spec.durationMs());
-        assertEquals(1000, spec.exitMs());
+        assertEquals(Long.valueOf(1000), spec.nodeToExitMs().get("node01"));
         assertEquals("foo", spec.error());
         String outputJson = JsonUtil.toJsonString(spec);
         assertEquals(inputJson, outputJson);

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message