kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6696 Trogdor should support destroying tasks (#4759)
Date Mon, 16 Apr 2018 07:51:43 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 832b096  KAFKA-6696 Trogdor should support destroying tasks (#4759)
832b096 is described below

commit 832b096f4fcfde9e90d796470eb0ce2a26283e81
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Mon Apr 16 00:51:33 2018 -0700

    KAFKA-6696 Trogdor should support destroying tasks (#4759)
    
    Implement destroying tasks and workers.  This means erasing all record of them on the Coordinator and the Agent.
    
    Workers should be identified by unique 64-bit worker IDs, rather than by the names of the tasks they are implementing.  This ensures that when a task is destroyed and re-created with the same task ID, the old workers will be not be treated as part of the new task instance.
    
    Fix some return results from RPCs.  In some cases RPCs were returning values that were never used.  Attempting to re-create the same task ID with different arguments should fail.  Add RequestConflictException to represent HTTP error code 409 (CONFLICT) for this scenario.
    
    If only one worker in a task stops, don't stop all the other workers for that task, unless the worker that stopped had an error.
    
    Reviewers: Anna Povzner <anna@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../java/org/apache/kafka/trogdor/agent/Agent.java |  16 +-
 .../apache/kafka/trogdor/agent/AgentClient.java    |  64 +++--
 .../kafka/trogdor/agent/AgentRestResource.java     |  35 ++-
 .../apache/kafka/trogdor/agent/WorkerManager.java  | 258 +++++++++++++++------
 .../kafka/trogdor/coordinator/Coordinator.java     |  23 +-
 .../trogdor/coordinator/CoordinatorClient.java     |  62 +++--
 .../coordinator/CoordinatorRestResource.java       |  34 ++-
 .../kafka/trogdor/coordinator/NodeManager.java     | 121 +++++++---
 .../kafka/trogdor/coordinator/TaskManager.java     | 248 +++++++++++++-------
 .../kafka/trogdor/rest/AgentStatusResponse.java    |   8 +-
 .../kafka/trogdor/rest/CreateTaskResponse.java     |  39 ----
 .../kafka/trogdor/rest/CreateWorkerRequest.java    |  18 +-
 .../kafka/trogdor/rest/CreateWorkerResponse.java   |  39 ----
 ...WorkerResponse.java => DestroyTaskRequest.java} |  15 +-
 ...TaskResponse.java => DestroyWorkerRequest.java} |  15 +-
 ...kRequest.java => RequestConflictException.java} |  19 +-
 .../kafka/trogdor/rest/RestExceptionMapper.java    |   9 +-
 .../apache/kafka/trogdor/rest/StopTaskRequest.java |   2 +-
 .../kafka/trogdor/rest/StopWorkerRequest.java      |  10 +-
 .../org/apache/kafka/trogdor/rest/WorkerDone.java  |   5 +-
 .../apache/kafka/trogdor/rest/WorkerReceiving.java |   5 +-
 .../apache/kafka/trogdor/rest/WorkerRunning.java   |   5 +-
 .../apache/kafka/trogdor/rest/WorkerStarting.java  |   5 +-
 .../org/apache/kafka/trogdor/rest/WorkerState.java |   9 +-
 .../apache/kafka/trogdor/rest/WorkerStopping.java  |   5 +-
 .../org/apache/kafka/trogdor/agent/AgentTest.java  | 133 ++++++++---
 .../apache/kafka/trogdor/common/ExpectedTasks.java |   7 +-
 .../trogdor/common/JsonSerializationTest.java      |   6 +-
 .../kafka/trogdor/common/MiniTrogdorCluster.java   |   2 +-
 .../kafka/trogdor/coordinator/CoordinatorTest.java |  94 +++++++-
 .../trogdor/rest/RestExceptionMapperTest.java      |  11 +-
 32 files changed, 870 insertions(+), 454 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2767132..64258bf 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -45,7 +45,7 @@
     <suppress checks="ClassDataAbstractionCoupling"
               files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(Errors|SaslAuthenticatorTest|AgentTest).java"/>
+              files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
 
     <suppress checks="BooleanExpressionComplexity"
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 3b5b21e..0324d2d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -27,10 +27,9 @@ import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,13 +94,16 @@ public final class Agent {
         return new AgentStatusResponse(serverStartMs, workerManager.workerStates());
     }
 
-    public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws Exception {
-        workerManager.createWorker(req.id(), req.spec());
-        return new CreateWorkerResponse(req.spec());
+    public void createWorker(CreateWorkerRequest req) throws Throwable {
+        workerManager.createWorker(req.workerId(), req.taskId(), req.spec());
     }
 
-    public StopWorkerResponse stopWorker(StopWorkerRequest req) throws Exception {
-        return new StopWorkerResponse(workerManager.stopWorker(req.id()));
+    public void stopWorker(StopWorkerRequest req) throws Throwable {
+        workerManager.stopWorker(req.workerId(), false);
+    }
+
+    public void destroyWorker(DestroyWorkerRequest req) throws Throwable {
+        workerManager.stopWorker(req.workerId(), true);
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index 08769a0..c89011b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -27,15 +27,16 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.UriBuilder;
+
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
@@ -116,20 +117,29 @@ public class AgentClient {
         return resp.body();
     }
 
-    public CreateWorkerResponse createWorker(CreateWorkerRequest request) throws Exception {
-        HttpResponse<CreateWorkerResponse> resp =
-            JsonRestServer.<CreateWorkerResponse>httpRequest(
+    public void createWorker(CreateWorkerRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.<Empty>httpRequest(
                 url("/agent/worker/create"), "POST",
-                request, new TypeReference<CreateWorkerResponse>() { }, maxTries);
-        return resp.body();
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
-    public StopWorkerResponse stopWorker(StopWorkerRequest request) throws Exception {
-        HttpResponse<StopWorkerResponse> resp =
-            JsonRestServer.<StopWorkerResponse>httpRequest(url(
+    public void stopWorker(StopWorkerRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.<Empty>httpRequest(url(
                 "/agent/worker/stop"), "PUT",
-                request, new TypeReference<StopWorkerResponse>() { }, maxTries);
-        return resp.body();
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
+    }
+
+    public void destroyWorker(DestroyWorkerRequest request) throws Exception {
+        UriBuilder uriBuilder = UriBuilder.fromPath(url("/agent/worker"));
+        uriBuilder.queryParam("workerId", request.workerId());
+        HttpResponse<Empty> resp =
+            JsonRestServer.<Empty>httpRequest(uriBuilder.build().toString(), "DELETE",
+                null, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
     public void invokeShutdown() throws Exception {
@@ -166,10 +176,16 @@ public class AgentClient {
             .help("Create a new fault.");
         actions.addArgument("--stop-worker")
             .action(store())
-            .type(String.class)
+            .type(Long.class)
             .dest("stop_worker")
-            .metavar("SPEC_JSON")
-            .help("Create a new fault.");
+            .metavar("WORKER_ID")
+            .help("Stop a worker ID.");
+        actions.addArgument("--destroy-worker")
+            .action(store())
+            .type(Long.class)
+            .dest("destroy_worker")
+            .metavar("WORKER_ID")
+            .help("Destroy a worker ID.");
         actions.addArgument("--shutdown")
             .action(storeTrue())
             .type(Boolean.class)
@@ -197,13 +213,21 @@ public class AgentClient {
             System.out.println("Got agent status: " +
                 JsonUtil.toPrettyJsonString(client.status()));
         } else if (res.getString("create_worker") != null) {
-            client.createWorker(JsonUtil.JSON_SERDE.
-                readValue(res.getString("create_worker"),
-                    CreateWorkerRequest.class));
-            System.out.println("Created fault.");
+            CreateWorkerRequest req = JsonUtil.JSON_SERDE.
+                readValue(res.getString("create_worker"), CreateWorkerRequest.class);
+            client.createWorker(req);
+            System.out.printf("Sent CreateWorkerRequest for worker %d%n.", req.workerId());
+        } else if (res.getString("stop_worker") != null) {
+            long workerId = res.getLong("stop_worker");
+            client.stopWorker(new StopWorkerRequest(workerId));
+            System.out.printf("Sent StopWorkerRequest for worker %d%n.", workerId);
+        } else if (res.getString("destroy_worker") != null) {
+            long workerId = res.getLong("stop_worker");
+            client.destroyWorker(new DestroyWorkerRequest(workerId));
+            System.out.printf("Sent DestroyWorkerRequest for worker %d%n.", workerId);
         } else if (res.getBoolean("shutdown")) {
             client.invokeShutdown();
-            System.out.println("Sent shutdown request.");
+            System.out.println("Sent ShutdownRequest.");
         } else {
             System.out.println("You must choose an action. Type --help for help.");
             Exit.exit(1);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
index 773c580..1f2ad49 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
@@ -18,22 +18,34 @@ package org.apache.kafka.trogdor.agent;
 
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import java.util.concurrent.atomic.AtomicReference;
 
-
+/**
+ * The REST resource for the Agent. This describes the RPCs which the agent can accept.
+ *
+ * RPCs should be idempotent.  This is important because if the server's response is
+ * lost, the client will simply retransmit the same request. The server's response must
+ * be the same the second time around.
+ *
+ * We return the empty JSON object {} rather than void for RPCs that have no results.
+ * This ensures that if we want to add more return results later, we can do so in a
+ * compatible way.
+ */
 @Path("/agent")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
@@ -55,14 +67,23 @@ public class AgentRestResource {
 
     @POST
     @Path("/worker/create")
-    public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws Throwable {
-        return agent().createWorker(req);
+    public Empty createWorker(CreateWorkerRequest req) throws Throwable {
+        agent().createWorker(req);
+        return Empty.INSTANCE;
     }
 
     @PUT
     @Path("/worker/stop")
-    public StopWorkerResponse stopWorker(StopWorkerRequest req) throws Throwable {
-        return agent().stopWorker(req);
+    public Empty stopWorker(StopWorkerRequest req) throws Throwable {
+        agent().stopWorker(req);
+        return Empty.INSTANCE;
+    }
+
+    @DELETE
+    @Path("/worker")
+    public Empty destroyWorker(@DefaultValue("0") @QueryParam("workerId") long workerId) throws Throwable {
+        agent().destroyWorker(new DestroyWorkerRequest(workerId));
+        return Empty.INSTANCE;
     }
 
     @PUT
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 7c8de6d..59d34c9 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStarting;
@@ -36,10 +37,12 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -72,7 +75,7 @@ public final class WorkerManager {
     /**
      * A map of task IDs to Work objects.
      */
-    private final Map<String, Worker> workers;
+    private final Map<Long, Worker> workers;
 
     /**
      * An ExecutorService used to schedule events in the future.
@@ -137,12 +140,15 @@ public final class WorkerManager {
                 return false;
             }
             shutdown = true;
+            if (refCount == 0) {
+                this.notifyAll();
+            }
             return true;
         }
 
         synchronized void waitForQuiescence() throws InterruptedException {
             while ((!shutdown) || (refCount > 0)) {
-                wait();
+                this.wait();
             }
         }
     }
@@ -174,9 +180,14 @@ public final class WorkerManager {
      */
     class Worker {
         /**
+         * The worker ID.
+         */
+        private final long workerId;
+
+        /**
          * The task ID.
          */
-        private final String id;
+        private final String taskId;
 
         /**
          * The task specification.
@@ -217,7 +228,7 @@ public final class WorkerManager {
          * If there is a task timeout scheduled, this is a future which can
          * be used to cancel it.
          */
-        private Future<TaskSpec> timeoutFuture = null;
+        private Future<Void> timeoutFuture = null;
 
         /**
          * A shutdown manager reference which will keep the WorkerManager
@@ -225,16 +236,26 @@ public final class WorkerManager {
          */
         private ShutdownManager.Reference reference;
 
-        Worker(String id, TaskSpec spec, long now) {
-            this.id = id;
+        /**
+         * Whether we should destroy the records of this worker once it stops.
+         */
+        private boolean mustDestroy = false;
+
+        Worker(long workerId, String taskId, TaskSpec spec, long now) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
-            this.taskWorker = spec.newTaskWorker(id);
+            this.taskWorker = spec.newTaskWorker(taskId);
             this.startedMs = now;
             this.reference = shutdownManager.takeReference();
         }
 
-        String id() {
-            return id;
+        long workerId() {
+            return workerId;
+        }
+
+        String taskId() {
+            return taskId;
         }
 
         TaskSpec spec() {
@@ -244,14 +265,14 @@ public final class WorkerManager {
         WorkerState state() {
             switch (state) {
                 case STARTING:
-                    return new WorkerStarting(spec);
+                    return new WorkerStarting(taskId, spec);
                 case RUNNING:
-                    return new WorkerRunning(spec, startedMs, status.get());
+                    return new WorkerRunning(taskId, spec, startedMs, status.get());
                 case CANCELLING:
                 case STOPPING:
-                    return new WorkerStopping(spec, startedMs, status.get());
+                    return new WorkerStopping(taskId, spec, startedMs, status.get());
                 case DONE:
-                    return new WorkerDone(spec, startedMs, doneMs, status.get(), error);
+                    return new WorkerDone(taskId, spec, startedMs, doneMs, status.get(), error);
             }
             throw new RuntimeException("unreachable");
         }
@@ -259,7 +280,7 @@ public final class WorkerManager {
         void transitionToRunning() {
             state = State.RUNNING;
             timeoutFuture = scheduler.schedule(stateChangeExecutor,
-                new StopWorker(id), spec.durationMs());
+                new StopWorker(workerId, false), spec.durationMs());
         }
 
         void transitionToStopping() {
@@ -268,7 +289,7 @@ public final class WorkerManager {
                 timeoutFuture.cancel(false);
                 timeoutFuture = null;
             }
-            workerCleanupExecutor.submit(new CleanupWorker(this));
+            workerCleanupExecutor.submit(new HaltWorker(this));
         }
 
         void transitionToDone() {
@@ -279,15 +300,20 @@ public final class WorkerManager {
                 reference = null;
             }
         }
+
+        @Override
+        public String toString() {
+            return String.format("%s_%d", taskId, workerId);
+        }
     }
 
-    public void createWorker(final String id, TaskSpec spec) throws Exception {
+    public void createWorker(long workerId, String taskId, TaskSpec spec) throws Throwable {
         try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
             final Worker worker = stateChangeExecutor.
-                submit(new CreateWorker(id, spec, time.milliseconds())).get();
+                submit(new CreateWorker(workerId, taskId, spec, time.milliseconds())).get();
             if (worker == null) {
                 log.info("{}: Ignoring request to create worker {}, because there is already " +
-                    "a worker with that id.", nodeName, id);
+                    "a worker with that id.", nodeName, workerId);
                 return;
             }
             KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
@@ -297,9 +323,10 @@ public final class WorkerManager {
                     if (errorString == null)
                         errorString = "";
                     if (errorString.isEmpty()) {
-                        log.info("{}: Worker {} is halting.", nodeName, id);
+                        log.info("{}: Worker {} is halting.", nodeName, worker);
                     } else {
-                        log.info("{}: Worker {} is halting with error {}", nodeName, id, errorString);
+                        log.info("{}: Worker {} is halting with error {}",
+                            nodeName, worker, errorString);
                     }
                     stateChangeExecutor.submit(
                         new HandleWorkerHalting(worker, errorString, false));
@@ -309,11 +336,20 @@ public final class WorkerManager {
             try {
                 worker.taskWorker.start(platform, worker.status, haltFuture);
             } catch (Exception e) {
-                log.info("{}: Worker {} start() exception", nodeName, id, e);
+                log.info("{}: Worker {} start() exception", nodeName, worker, e);
                 stateChangeExecutor.submit(new HandleWorkerHalting(worker,
                     "worker.start() exception: " + Utils.stackTrace(e), true));
             }
             stateChangeExecutor.submit(new FinishCreatingWorker(worker));
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof RequestConflictException) {
+                log.info("{}: request conflict while creating worker {} for task {} with spec {}.",
+                    nodeName, workerId, taskId, spec);
+            } else {
+                log.info("{}: Error creating worker {} for task {} with spec {}",
+                    nodeName, workerId, taskId, spec, e);
+            }
+            throw e.getCause();
         }
     }
 
@@ -321,27 +357,42 @@ public final class WorkerManager {
      * Handles a request to create a new worker.  Processed by the state change thread.
      */
     class CreateWorker implements Callable<Worker> {
-        private final String id;
+        private final long workerId;
+        private final String taskId;
         private final TaskSpec spec;
         private final long now;
 
-        CreateWorker(String id, TaskSpec spec, long now) {
-            this.id = id;
+        CreateWorker(long workerId, String taskId, TaskSpec spec, long now) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
             this.now = now;
         }
 
         @Override
         public Worker call() throws Exception {
-            Worker worker = workers.get(id);
-            if (worker != null) {
-                log.info("{}: Task ID {} is already in use.", nodeName, id);
-                return null;
+            try {
+                Worker worker = workers.get(workerId);
+                if (worker != null) {
+                    if (!worker.taskId().equals(taskId)) {
+                        throw new RequestConflictException("There is already a worker ID " + workerId +
+                            " with a different task ID.");
+                    } else if (!worker.spec().equals(spec)) {
+                        throw new RequestConflictException("There is already a worker ID " + workerId +
+                            " with a different task spec.");
+                    } else {
+                        return null;
+                    }
+                }
+                worker = new Worker(workerId, taskId, spec, now);
+                workers.put(workerId, worker);
+                log.info("{}: Created worker {} with spec {}", nodeName, worker, spec);
+                return worker;
+            } catch (Exception e) {
+                log.info("{}: unable to create worker {} for task {}, with spec {}",
+                    nodeName, workerId, taskId, spec, e);
+                throw e;
             }
-            worker = new Worker(id, spec, now);
-            workers.put(id, worker);
-            log.info("{}: Created a new worker for task {} with spec {}", nodeName, id, spec);
-            return worker;
         }
     }
 
@@ -360,12 +411,12 @@ public final class WorkerManager {
             switch (worker.state) {
                 case CANCELLING:
                     log.info("{}: Worker {} was cancelled while it was starting up.  " +
-                        "Transitioning to STOPPING.", nodeName, worker.id);
+                        "Transitioning to STOPPING.", nodeName, worker);
                     worker.transitionToStopping();
                     break;
                 case STARTING:
                     log.info("{}: Worker {} is now RUNNING.  Scheduled to stop in {} ms.",
-                        nodeName, worker.id, worker.spec.durationMs());
+                        nodeName, worker, worker.spec.durationMs());
                     worker.transitionToRunning();
                     break;
                 default:
@@ -400,29 +451,29 @@ public final class WorkerManager {
                 case STARTING:
                     if (startupHalt) {
                         log.info("{}: Worker {} {} during startup.  Transitioning to DONE.",
-                            nodeName, worker.id, verb);
+                            nodeName, worker, verb);
                         worker.transitionToDone();
                     } else {
                         log.info("{}: Worker {} {} during startup.  Transitioning to CANCELLING.",
-                            nodeName, worker.id, verb);
+                            nodeName, worker, verb);
                         worker.state = State.CANCELLING;
                     }
                     break;
                 case CANCELLING:
                     log.info("{}: Cancelling worker {} {}.  ",
-                            nodeName, worker.id, verb);
+                            nodeName, worker, verb);
                     break;
                 case RUNNING:
                     log.info("{}: Running worker {} {}.  Transitioning to STOPPING.",
-                        nodeName, worker.id, verb);
+                        nodeName, worker, verb);
                     worker.transitionToStopping();
                     break;
                 case STOPPING:
-                    log.info("{}: Stopping worker {} {}.", nodeName, worker.id, verb);
+                    log.info("{}: Stopping worker {} {}.", nodeName, worker, verb);
                     break;
                 case DONE:
                     log.info("{}: Can't halt worker {} because it is already DONE.",
-                        nodeName, worker.id);
+                        nodeName, worker);
                     break;
             }
             return null;
@@ -432,7 +483,7 @@ public final class WorkerManager {
     /**
      * Transitions a worker to WorkerDone.  Processed by the state change thread.
      */
-    static class CompleteWorker implements Callable<Void> {
+    class CompleteWorker implements Callable<Void> {
         private final Worker worker;
 
         private final String failure;
@@ -448,60 +499,79 @@ public final class WorkerManager {
                 worker.error = failure;
             }
             worker.transitionToDone();
+            if (worker.mustDestroy) {
+                log.info("{}: destroying worker {} with error {}",
+                    nodeName, worker, worker.error);
+                workers.remove(worker.workerId);
+            } else {
+                log.info("{}: completed worker {} with error {}",
+                    nodeName, worker, worker.error);
+            }
             return null;
         }
     }
 
-    public TaskSpec stopWorker(String id) throws Exception {
+    public void stopWorker(long workerId, boolean mustDestroy) throws Throwable {
         try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
-            TaskSpec taskSpec = stateChangeExecutor.submit(new StopWorker(id)).get();
-            if (taskSpec == null) {
-                throw new KafkaException("No task found with id " + id);
-            }
-            return taskSpec;
+            stateChangeExecutor.submit(new StopWorker(workerId, mustDestroy)).get();
+        } catch (ExecutionException e) {
+            throw e.getCause();
         }
     }
 
     /**
      * Stops a worker.  Processed by the state change thread.
      */
-    class StopWorker implements Callable<TaskSpec> {
-        private final String id;
+    class StopWorker implements Callable<Void> {
+        private final long workerId;
+        private final boolean mustDestroy;
 
-        StopWorker(String id) {
-            this.id = id;
+        StopWorker(long workerId, boolean mustDestroy) {
+            this.workerId = workerId;
+            this.mustDestroy = mustDestroy;
         }
 
         @Override
-        public TaskSpec call() throws Exception {
-            Worker worker = workers.get(id);
+        public Void call() throws Exception {
+            Worker worker = workers.get(workerId);
             if (worker == null) {
+                log.info("{}: Can't stop worker {} because there is no worker with that ID.",
+                    nodeName, workerId);
                 return null;
             }
+            if (mustDestroy) {
+                worker.mustDestroy = true;
+            }
             switch (worker.state) {
                 case STARTING:
                     log.info("{}: Cancelling worker {} during its startup process.",
-                        nodeName, id);
+                        nodeName, worker);
                     worker.state = State.CANCELLING;
                     break;
                 case CANCELLING:
                     log.info("{}: Can't stop worker {}, because it is already being " +
-                        "cancelled.", nodeName, id);
+                        "cancelled.", nodeName, worker);
                     break;
                 case RUNNING:
-                    log.info("{}: Stopping running worker {}.", nodeName, id);
+                    log.info("{}: Stopping running worker {}.", nodeName, worker);
                     worker.transitionToStopping();
                     break;
                 case STOPPING:
                     log.info("{}: Can't stop worker {}, because it is already " +
-                            "stopping.", nodeName, id);
+                            "stopping.", nodeName, worker);
                     break;
                 case DONE:
-                    log.debug("{}: Can't stop worker {}, because it is already done.",
-                        nodeName, id);
+                    if (worker.mustDestroy) {
+                        log.info("{}: destroying worker {} with error {}",
+                            nodeName, worker, worker.error);
+                        workers.remove(worker.workerId);
+                    } else {
+                        log.debug("{}: Can't stop worker {}, because it is already done.",
+                            nodeName, worker);
+                    }
                     break;
             }
-            return worker.spec();
+            return null;
         }
     }
 
@@ -509,10 +579,10 @@ public final class WorkerManager {
      * Cleans up the resources associated with a worker.  Processed by the worker
      * cleanup thread pool.
      */
-    class CleanupWorker implements Callable<Void> {
+    class HaltWorker implements Callable<Void> {
         private final Worker worker;
 
-        CleanupWorker(Worker worker) {
+        HaltWorker(Worker worker) {
             this.worker = worker;
         }
 
@@ -530,18 +600,18 @@ public final class WorkerManager {
         }
     }
 
-    public TreeMap<String, WorkerState> workerStates() throws Exception {
+    public TreeMap<Long, WorkerState> workerStates() throws Exception {
         try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
             return stateChangeExecutor.submit(new GetWorkerStates()).get();
         }
     }
 
-    class GetWorkerStates implements Callable<TreeMap<String, WorkerState>> {
+    class GetWorkerStates implements Callable<TreeMap<Long, WorkerState>> {
         @Override
-        public TreeMap<String, WorkerState> call() throws Exception {
-            TreeMap<String, WorkerState> workerMap = new TreeMap<>();
+        public TreeMap<Long, WorkerState> call() throws Exception {
+            TreeMap<Long, WorkerState> workerMap = new TreeMap<>();
             for (Worker worker : workers.values()) {
-                workerMap.put(worker.id(), worker.state());
+                workerMap.put(worker.workerId(), worker.state());
             }
             return workerMap;
         }
@@ -562,17 +632,53 @@ public final class WorkerManager {
     class Shutdown implements Callable<Void> {
         @Override
         public Void call() throws Exception {
-            log.info("{}: Shutting down WorkerManager.", platform.curNode().name());
-            for (Worker worker : workers.values()) {
-                stateChangeExecutor.submit(new StopWorker(worker.id));
+            log.info("{}: Shutting down WorkerManager.", nodeName);
+            try {
+                stateChangeExecutor.submit(new DestroyAllWorkers()).get();
+                log.info("{}: Waiting for shutdownManager quiescence...", nodeName);
+                shutdownManager.waitForQuiescence();
+                workerCleanupExecutor.shutdownNow();
+                stateChangeExecutor.shutdownNow();
+                log.info("{}: Waiting for workerCleanupExecutor to terminate...", nodeName);
+                workerCleanupExecutor.awaitTermination(1, TimeUnit.DAYS);
+                log.info("{}: Waiting for stateChangeExecutor to terminate...", nodeName);
+                stateChangeExecutor.awaitTermination(1, TimeUnit.DAYS);
+                log.info("{}: Shutting down shutdownExecutor.", nodeName);
+                shutdownExecutor.shutdown();
+            } catch (Exception e) {
+                log.info("{}: Caught exception while shutting down WorkerManager", nodeName, e);
+                throw e;
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Begins the process of destroying all workers.  Processed by the state change thread.
+     */
+    class DestroyAllWorkers implements Callable<Void> {
+        @Override
+        public Void call() throws Exception {
+            log.info("{}: Destroying all workers.", nodeName);
+
+            // StopWorker may remove elements from the set of worker IDs.  That might generate
+            // a ConcurrentModificationException if we were iterating over the worker ID
+            // set directly.  Therefore, we make a copy of the worker IDs here and iterate
+            // over that instead.
+            //
+            // Note that there is no possible way that more worker IDs can be added while this
+            // callable is running, because the state change executor is single-threaded.
+            ArrayList<Long> workerIds = new ArrayList<>(workers.keySet());
+
+            for (long workerId : workerIds) {
+                try {
+                    new StopWorker(workerId, true).call();
+                } catch (Exception e) {
+                    log.error("Failed to stop worker {}", workerId, e);
+                }
             }
-            shutdownManager.waitForQuiescence();
-            workerCleanupExecutor.shutdownNow();
-            stateChangeExecutor.shutdownNow();
-            workerCleanupExecutor.awaitTermination(1, TimeUnit.DAYS);
-            stateChangeExecutor.awaitTermination(1, TimeUnit.DAYS);
-            shutdownExecutor.shutdown();
             return null;
         }
     }
+
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 717d7c7..23f3ceb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -27,15 +27,16 @@ import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ThreadLocalRandom;
+
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
 /**
@@ -72,9 +73,9 @@ public final class Coordinator {
      * @param resource      The AgentRestResoure to use.
      */
     public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer restServer,
-                       CoordinatorRestResource resource) {
+                       CoordinatorRestResource resource, long firstWorkerId) {
         this.startTimeMs = scheduler.time().milliseconds();
-        this.taskManager = new TaskManager(platform, scheduler);
+        this.taskManager = new TaskManager(platform, scheduler, firstWorkerId);
         this.restServer = restServer;
         resource.setCoordinator(this);
     }
@@ -87,12 +88,16 @@ public final class Coordinator {
         return new CoordinatorStatusResponse(startTimeMs);
     }
 
-    public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
-        return new CreateTaskResponse(taskManager.createTask(request.id(), request.spec()));
+    public void createTask(CreateTaskRequest request) throws Throwable {
+        taskManager.createTask(request.id(), request.spec());
+    }
+
+    public void stopTask(StopTaskRequest request) throws Throwable {
+        taskManager.stopTask(request.id());
     }
 
-    public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
-        return new StopTaskResponse(taskManager.stopTask(request.id()));
+    public void destroyTask(DestroyTaskRequest request) throws Throwable {
+        taskManager.destroyTask(request.id());
     }
 
     public TasksResponse tasks(TasksRequest request) throws Exception {
@@ -149,7 +154,7 @@ public final class Coordinator {
         CoordinatorRestResource resource = new CoordinatorRestResource();
         log.info("Starting coordinator process.");
         final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
-            restServer, resource);
+            restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
         restServer.start(resource);
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 0677296..780ae73 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -27,12 +27,11 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
@@ -116,36 +115,45 @@ public class CoordinatorClient {
         return resp.body();
     }
 
-    public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
-        HttpResponse<CreateTaskResponse> resp =
-            JsonRestServer.<CreateTaskResponse>httpRequest(log, url("/coordinator/task/create"), "POST",
-                request, new TypeReference<CreateTaskResponse>() { }, maxTries);
-        return resp.body();
+    public void createTask(CreateTaskRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.httpRequest(log, url("/coordinator/task/create"), "POST",
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
-    public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
-        HttpResponse<StopTaskResponse> resp =
-            JsonRestServer.<StopTaskResponse>httpRequest(log, url("/coordinator/task/stop"), "PUT",
-                request, new TypeReference<StopTaskResponse>() { }, maxTries);
-        return resp.body();
+    public void stopTask(StopTaskRequest request) throws Exception {
+        HttpResponse<Empty> resp =
+            JsonRestServer.httpRequest(log, url("/coordinator/task/stop"), "PUT",
+                request, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
+    }
+
+    public void destroyTask(DestroyTaskRequest request) throws Exception {
+        UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
+        uriBuilder.queryParam("taskId", request.id());
+        HttpResponse<Empty> resp =
+            JsonRestServer.httpRequest(log, uriBuilder.build().toString(), "DELETE",
+                null, new TypeReference<Empty>() { }, maxTries);
+        resp.body();
     }
 
     public TasksResponse tasks(TasksRequest request) throws Exception {
         UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
-        uriBuilder.queryParam("taskId", request.taskIds().toArray(new String[0]));
+        uriBuilder.queryParam("taskId", (Object[]) request.taskIds().toArray(new String[0]));
         uriBuilder.queryParam("firstStartMs", request.firstStartMs());
         uriBuilder.queryParam("lastStartMs", request.lastStartMs());
         uriBuilder.queryParam("firstEndMs", request.firstEndMs());
         uriBuilder.queryParam("lastEndMs", request.lastEndMs());
         HttpResponse<TasksResponse> resp =
-            JsonRestServer.<TasksResponse>httpRequest(log, uriBuilder.build().toString(), "GET",
+            JsonRestServer.httpRequest(log, uriBuilder.build().toString(), "GET",
                 null, new TypeReference<TasksResponse>() { }, maxTries);
         return resp.body();
     }
 
     public void shutdown() throws Exception {
         HttpResponse<Empty> resp =
-            JsonRestServer.<Empty>httpRequest(log, url("/coordinator/shutdown"), "PUT",
+            JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT",
                 null, new TypeReference<Empty>() { }, maxTries);
         resp.body();
     }
@@ -185,6 +193,12 @@ public class CoordinatorClient {
             .dest("stop_task")
             .metavar("TASK_ID")
             .help("Stop a task.");
+        actions.addArgument("--destroy-task")
+            .action(store())
+            .type(String.class)
+            .dest("destroy_task")
+            .metavar("TASK_ID")
+            .help("Destroy a task.");
         actions.addArgument("--shutdown")
             .action(storeTrue())
             .type(Boolean.class)
@@ -216,15 +230,21 @@ public class CoordinatorClient {
                 JsonUtil.toPrettyJsonString(client.tasks(
                     new TasksRequest(null, 0, 0, 0, 0))));
         } else if (res.getString("create_task") != null) {
-            client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
-                CreateTaskRequest.class));
-            System.out.println("Created task.");
+            CreateTaskRequest req = JsonUtil.JSON_SERDE.
+                readValue(res.getString("create_task"), CreateTaskRequest.class);
+            client.createTask(req);
+            System.out.printf("Sent CreateTaskRequest for task %s.", req.id());
         } else if (res.getString("stop_task") != null) {
-            client.stopTask(new StopTaskRequest(res.getString("stop_task")));
-            System.out.println("Created task.");
+            String taskId = res.getString("stop_task");
+            client.stopTask(new StopTaskRequest(taskId));
+            System.out.printf("Sent StopTaskRequest for task %s.%n", taskId);
+        } else if (res.getString("destroy_task") != null) {
+            String taskId = res.getString("destroy_task");
+            client.destroyTask(new DestroyTaskRequest(taskId));
+            System.out.printf("Sent DestroyTaskRequest for task %s.%n", taskId);
         } else if (res.getBoolean("shutdown")) {
             client.shutdown();
-            System.out.println("Sent shutdown request.");
+            System.out.println("Sent ShutdownRequest.");
         } else {
             System.out.println("You must choose an action. Type --help for help.");
             Exit.exit(1);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index b8663ec..cbfbddd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -19,15 +19,15 @@ package org.apache.kafka.trogdor.coordinator;
 import org.apache.kafka.trogdor.rest.CoordinatorShutdownRequest;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -39,7 +39,18 @@ import javax.ws.rs.core.MediaType;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
-
+/**
+ * The REST resource for the Coordinator. This describes the RPCs which the coordinator
+ * can accept.
+ *
+ * RPCs should be idempotent.  This is important because if the server's response is
+ * lost, the client will simply retransmit the same request. The server's response must
+ * be the same the second time around.
+ *
+ * We return the empty JSON object {} rather than void for RPCs that have no results.
+ * This ensures that if we want to add more return results later, we can do so in a
+ * compatible way.
+ */
 @Path("/coordinator")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
@@ -61,14 +72,23 @@ public class CoordinatorRestResource {
 
     @POST
     @Path("/task/create")
-    public CreateTaskResponse createTask(CreateTaskRequest request) throws Throwable {
-        return coordinator().createTask(request);
+    public Empty createTask(CreateTaskRequest request) throws Throwable {
+        coordinator().createTask(request);
+        return Empty.INSTANCE;
     }
 
     @PUT
     @Path("/task/stop")
-    public StopTaskResponse stopTask(StopTaskRequest request) throws Throwable {
-        return coordinator().stopTask(request);
+    public Empty stopTask(StopTaskRequest request) throws Throwable {
+        coordinator().stopTask(request);
+        return Empty.INSTANCE;
+    }
+
+    @DELETE
+    @Path("/tasks")
+    public Empty destroyTask(@DefaultValue("") @QueryParam("taskId") String taskId) throws Throwable {
+        coordinator().destroyTask(new DestroyTaskRequest(taskId));
+        return Empty.INSTANCE;
     }
 
     @GET
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 91ef9c2..3f0075e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -79,13 +79,16 @@ public final class NodeManager {
     private static final long HEARTBEAT_DELAY_MS = 1000L;
 
     class ManagedWorker {
-        private final String id;
+        private final long workerId;
+        private final String taskId;
         private final TaskSpec spec;
         private boolean shouldRun;
         private WorkerState state;
 
-        ManagedWorker(String id, TaskSpec spec, boolean shouldRun, WorkerState state) {
-            this.id = id;
+        ManagedWorker(long workerId, String taskId, TaskSpec spec,
+                      boolean shouldRun, WorkerState state) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
             this.shouldRun = shouldRun;
             this.state = state;
@@ -93,19 +96,24 @@ public final class NodeManager {
 
         void tryCreate() {
             try {
-                client.createWorker(new CreateWorkerRequest(id, spec));
+                client.createWorker(new CreateWorkerRequest(workerId, taskId, spec));
             } catch (Throwable e) {
-                log.error("{}: error creating worker {}.", node.name(), id, e);
+                log.error("{}: error creating worker {}.", node.name(), this, e);
             }
         }
 
         void tryStop() {
             try {
-                client.stopWorker(new StopWorkerRequest(id));
+                client.stopWorker(new StopWorkerRequest(workerId));
             } catch (Throwable e) {
-                log.error("{}: error stopping worker {}.", node.name(), id, e);
+                log.error("{}: error stopping worker {}.", node.name(), this, e);
             }
         }
+
+        @Override
+        public String toString() {
+            return String.format("%s_%d", taskId, workerId);
+        }
     }
 
     /**
@@ -126,7 +134,7 @@ public final class NodeManager {
     /**
      * Maps task IDs to worker structures.
      */
-    private final Map<String, ManagedWorker> workers;
+    private final Map<Long, ManagedWorker> workers;
 
     /**
      * An executor service which manages the thread dedicated to this node.
@@ -196,24 +204,25 @@ public final class NodeManager {
                 }
                 // Identify workers which we think should be running, but which do not appear
                 // in the agent's response.  We need to send startWorker requests for these.
-                for (Map.Entry<String, ManagedWorker> entry : workers.entrySet()) {
-                    String id = entry.getKey();
-                    if (!agentStatus.workers().containsKey(id)) {
+                for (Map.Entry<Long, ManagedWorker> entry : workers.entrySet()) {
+                    Long workerId = entry.getKey();
+                    if (!agentStatus.workers().containsKey(workerId)) {
                         ManagedWorker worker = entry.getValue();
                         if (worker.shouldRun) {
                             worker.tryCreate();
                         }
                     }
                 }
-                for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
-                    String id = entry.getKey();
+                for (Map.Entry<Long, WorkerState> entry : agentStatus.workers().entrySet()) {
+                    long workerId = entry.getKey();
                     WorkerState state = entry.getValue();
-                    ManagedWorker worker = workers.get(id);
+                    ManagedWorker worker = workers.get(workerId);
                     if (worker == null) {
                         // Identify tasks which are running, but which we don't know about.
                         // Add these to the NodeManager as tasks that should not be running.
-                        log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
-                        workers.put(id, new ManagedWorker(id, state.spec(), false, state));
+                        log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId);
+                        workers.put(workerId, new ManagedWorker(workerId, state.taskId(),
+                            state.spec(), false, state));
                     } else {
                         // Handle workers which need to be stopped.
                         if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
@@ -227,7 +236,7 @@ public final class NodeManager {
                         } else {
                             log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
                             worker.state = state;
-                            taskManager.updateWorkerState(node.name(), worker.id, state);
+                            taskManager.updateWorkerState(node.name(), worker.workerId, state);
                         }
                     }
                 }
@@ -240,34 +249,39 @@ public final class NodeManager {
     /**
      * Create a new worker.
      *
-     * @param id                    The new worker id.
+     * @param workerId              The new worker id.
+     * @param taskId                The new task id.
      * @param spec                  The task specification to use with the new worker.
      */
-    public void createWorker(String id, TaskSpec spec) {
-        executor.submit(new CreateWorker(id, spec));
+    public void createWorker(long workerId, String taskId, TaskSpec spec) {
+        executor.submit(new CreateWorker(workerId, taskId, spec));
     }
 
     /**
      * Starts a worker.
      */
     class CreateWorker implements Callable<Void> {
-        private final String id;
+        private final long workerId;
+        private final String taskId;
         private final TaskSpec spec;
 
-        CreateWorker(String id, TaskSpec spec) {
-            this.id = id;
+        CreateWorker(long workerId, String taskId, TaskSpec spec) {
+            this.workerId = workerId;
+            this.taskId = taskId;
             this.spec = spec;
         }
 
         @Override
         public Void call() throws Exception {
-            ManagedWorker worker = workers.get(id);
+            ManagedWorker worker = workers.get(workerId);
             if (worker != null) {
-                log.error("{}: there is already a worker for task {}.", node.name(), id);
+                log.error("{}: there is already a worker {} with ID {}.",
+                    node.name(), worker, workerId);
                 return null;
             }
-            log.info("{}: scheduling worker {} to start.", node.name(), id);
-            workers.put(id, new ManagedWorker(id, spec, true, new WorkerReceiving(spec)));
+            worker = new ManagedWorker(workerId, taskId, spec, true, new WorkerReceiving(taskId, spec));
+            log.info("{}: scheduling worker {} to start.", node.name(), worker);
+            workers.put(workerId, worker);
             rescheduleNextHeartbeat(0);
             return null;
         }
@@ -276,41 +290,72 @@ public final class NodeManager {
     /**
      * Stop a worker.
      *
-     * @param id                    The id of the worker to stop.
+     * @param workerId              The id of the worker to stop.
      */
-    public void stopWorker(String id) {
-        executor.submit(new StopWorker(id));
+    public void stopWorker(long workerId) {
+        executor.submit(new StopWorker(workerId));
     }
 
     /**
      * Stops a worker.
      */
     class StopWorker implements Callable<Void> {
-        private final String id;
+        private final long workerId;
 
-        StopWorker(String id) {
-            this.id = id;
+        StopWorker(long workerId) {
+            this.workerId = workerId;
         }
 
         @Override
         public Void call() throws Exception {
-            ManagedWorker worker = workers.get(id);
+            ManagedWorker worker = workers.get(workerId);
             if (worker == null) {
-                log.error("{}: can't stop non-existent worker {}.", node.name(), id);
+                log.error("{}: unable to locate worker to stop with ID {}.", node.name(), workerId);
                 return null;
             }
             if (!worker.shouldRun) {
-                log.error("{}: The worker for task {} is already scheduled to stop.",
-                    node.name(), id);
+                log.error("{}: Worker {} is already scheduled to stop.",
+                    node.name(), worker);
                 return null;
             }
-            log.info("{}: scheduling worker {} on {} to stop.", node.name(), id);
+            log.info("{}: scheduling worker {} to stop.", node.name(), worker);
             worker.shouldRun = false;
             rescheduleNextHeartbeat(0);
             return null;
         }
     }
 
+    /**
+     * Destroy a worker.
+     *
+     * @param workerId              The id of the worker to destroy.
+     */
+    public void destroyWorker(long workerId) {
+        executor.submit(new DestroyWorker(workerId));
+    }
+
+    /**
+     * Destroys a worker.
+     */
+    class DestroyWorker implements Callable<Void> {
+        private final long workerId;
+
+        DestroyWorker(long workerId) {
+            this.workerId = workerId;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            ManagedWorker worker = workers.remove(workerId);
+            if (worker == null) {
+                log.error("{}: unable to locate worker to destroy with ID {}.", node.name(), workerId);
+                return null;
+            }
+            rescheduleNextHeartbeat(0);
+            return null;
+        }
+    }
+
     public void beginShutdown(boolean stopNode) {
         executor.shutdownNow();
         if (stopNode) {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 7e19c8b..74082bd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -28,6 +29,7 @@ import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Node;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
 import org.apache.kafka.trogdor.rest.TaskRunning;
@@ -107,11 +109,21 @@ public final class TaskManager {
     private final Map<String, NodeManager> nodeManagers;
 
     /**
+     * The states of all workers.
+     */
+    private final Map<Long, WorkerState> workerStates = new HashMap<>();
+
+    /**
      * True if the TaskManager is shut down.
      */
     private AtomicBoolean shutdown = new AtomicBoolean(false);
 
-    TaskManager(Platform platform, Scheduler scheduler) {
+    /**
+     * The ID to use for the next worker.  Only accessed by the state change thread.
+     */
+    private long nextWorkerId;
+
+    TaskManager(Platform platform, Scheduler scheduler, long firstWorkerId) {
         this.platform = platform;
         this.scheduler = scheduler;
         this.time = scheduler.time();
@@ -119,6 +131,7 @@ public final class TaskManager {
         this.executor = Executors.newSingleThreadScheduledExecutor(
             ThreadUtils.createThreadFactory("TaskManagerStateThread", false));
         this.nodeManagers = new HashMap<>();
+        this.nextWorkerId = firstWorkerId;
         for (Node node : platform.topology().nodes().values()) {
             if (Node.Util.getTrogdorAgentPort(node) > 0) {
                 this.nodeManagers.put(node.name(), new NodeManager(node, this));
@@ -178,9 +191,9 @@ public final class TaskManager {
         private Future<?> startFuture = null;
 
         /**
-         * The states of the workers involved with this task.
+         * Maps node names to worker IDs.
          */
-        public Map<String, WorkerState> workerStates = new TreeMap<>();
+        public TreeMap<String, Long> workerIds = new TreeMap<>();
 
         /**
          * If this is non-empty, a message describing how this task failed.
@@ -240,38 +253,42 @@ public final class TaskManager {
                 case PENDING:
                     return new TaskPending(spec);
                 case RUNNING:
-                    return new TaskRunning(spec, startedMs, getCombinedStatus(workerStates));
+                    return new TaskRunning(spec, startedMs, getCombinedStatus());
                 case STOPPING:
-                    return new TaskStopping(spec, startedMs, getCombinedStatus(workerStates));
+                    return new TaskStopping(spec, startedMs, getCombinedStatus());
                 case DONE:
-                    return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus(workerStates));
+                    return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus());
             }
             throw new RuntimeException("unreachable");
         }
 
-        TreeSet<String> activeWorkers() {
-            TreeSet<String> workerNames = new TreeSet<>();
-            for (Map.Entry<String, WorkerState> entry : workerStates.entrySet()) {
-                if (!entry.getValue().done()) {
-                    workerNames.add(entry.getKey());
+        private JsonNode getCombinedStatus() {
+            if (workerIds.size() == 1) {
+                return workerStates.get(workerIds.values().iterator().next()).status();
+            } else {
+                ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
+                for (Map.Entry<String, Long> entry : workerIds.entrySet()) {
+                    String nodeName = entry.getKey();
+                    Long workerId = entry.getValue();
+                    WorkerState state = workerStates.get(workerId);
+                    JsonNode node = state.status();
+                    if (node != null) {
+                        objectNode.set(nodeName, node);
+                    }
                 }
+                return objectNode;
             }
-            return workerNames;
         }
-    }
 
-    private static final JsonNode getCombinedStatus(Map<String, WorkerState> states) {
-        if (states.size() == 1) {
-            return states.values().iterator().next().status();
-        } else {
-            ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
-            for (Map.Entry<String, WorkerState> entry : states.entrySet()) {
-                JsonNode node = entry.getValue().status();
-                if (node != null) {
-                    objectNode.set(entry.getKey(), node);
+        TreeMap<String, Long> activeWorkerIds() {
+            TreeMap<String, Long> activeWorkerIds = new TreeMap<>();
+            for (Map.Entry<String, Long> entry : workerIds.entrySet()) {
+                WorkerState workerState = workerStates.get(entry.getValue());
+                if (!workerState.done()) {
+                    activeWorkerIds.put(entry.getKey(), entry.getValue());
                 }
             }
-            return objectNode;
+            return activeWorkerIds;
         }
     }
 
@@ -280,27 +297,21 @@ public final class TaskManager {
      *
      * @param id                    The ID of the task to create.
      * @param spec                  The specification of the task to create.
-     *
-     * @return                      The specification of the task with the given ID.
-     *                              Note that if there was already a task with the given ID,
-     *                              this may be different from the specification that was
-     *                              requested.
      */
-    public TaskSpec createTask(final String id, TaskSpec spec)
-            throws ExecutionException, InterruptedException {
-        final TaskSpec existingSpec = executor.submit(new CreateTask(id, spec)).get();
-        if (existingSpec != null) {
-            log.info("Ignoring request to create task {}, because there is already " +
-                "a task with that id.", id);
-            return existingSpec;
+    public void createTask(final String id, TaskSpec spec)
+            throws Throwable {
+        try {
+            executor.submit(new CreateTask(id, spec)).get();
+        } catch (ExecutionException e) {
+            log.info("createTask(id={}, spec={}) error", id, spec, e);
+            throw e.getCause();
         }
-        return spec;
     }
 
     /**
      * Handles a request to create a new task.  Processed by the state change thread.
      */
-    class CreateTask implements Callable<TaskSpec> {
+    class CreateTask implements Callable<Void> {
         private final String id;
         private final TaskSpec spec;
 
@@ -310,11 +321,18 @@ public final class TaskManager {
         }
 
         @Override
-        public TaskSpec call() throws Exception {
+        public Void call() throws Exception {
+            if (id.isEmpty()) {
+                throw new InvalidRequestException("Invalid empty ID in createTask request.");
+            }
             ManagedTask task = tasks.get(id);
             if (task != null) {
-                log.info("Task ID {} is already in use.", id);
-                return task.spec;
+                if (!task.spec.equals(spec)) {
+                    throw new RequestConflictException("Task ID " + id + " already " +
+                        "exists, and has a different spec " + task.spec);
+                }
+                log.info("Task {} already exists with spec {}", id, spec);
+                return null;
             }
             TaskController controller = null;
             String failure = null;
@@ -374,8 +392,10 @@ public final class TaskManager {
             task.state = ManagedTaskState.RUNNING;
             task.startedMs = time.milliseconds();
             for (String workerName : nodeNames) {
-                task.workerStates.put(workerName, new WorkerReceiving(task.spec));
-                nodeManagers.get(workerName).createWorker(task.id, task.spec);
+                long workerId = nextWorkerId++;
+                task.workerIds.put(workerName, workerId);
+                workerStates.put(workerId, new WorkerReceiving(task.id, task.spec));
+                nodeManagers.get(workerName).createWorker(workerId, task.id, task.spec);
             }
             return null;
         }
@@ -385,18 +405,20 @@ public final class TaskManager {
      * Stop a task.
      *
      * @param id                    The ID of the task to stop.
-     * @return                      The specification of the task which was stopped, or null if there
-     *                              was no task found with the given ID.
      */
-    public TaskSpec stopTask(final String id) throws ExecutionException, InterruptedException {
-        final TaskSpec spec = executor.submit(new CancelTask(id)).get();
-        return spec;
+    public void stopTask(final String id) throws Throwable {
+        try {
+            executor.submit(new CancelTask(id)).get();
+        } catch (ExecutionException e) {
+            log.info("stopTask(id={}) error", id, e);
+            throw e.getCause();
+        }
     }
 
     /**
      * Handles cancelling a task.  Processed by the state change thread.
      */
-    class CancelTask implements Callable<TaskSpec> {
+    class CancelTask implements Callable<Void> {
         private final String id;
 
         CancelTask(String id) {
@@ -404,7 +426,10 @@ public final class TaskManager {
         }
 
         @Override
-        public TaskSpec call() throws Exception {
+        public Void call() throws Exception {
+            if (id.isEmpty()) {
+                throw new InvalidRequestException("Invalid empty ID in stopTask request.");
+            }
             ManagedTask task = tasks.get(id);
             if (task == null) {
                 log.info("Can't cancel non-existent task {}.", id);
@@ -420,16 +445,21 @@ public final class TaskManager {
                     break;
                 case RUNNING:
                     task.cancelled = true;
-                    TreeSet<String> activeWorkers = task.activeWorkers();
-                    if (activeWorkers.isEmpty()) {
-                        log.info("Task {} is now complete with error: {}", id, task.error);
+                    TreeMap<String, Long> activeWorkerIds = task.activeWorkerIds();
+                    if (activeWorkerIds.isEmpty()) {
+                        if (task.error.isEmpty()) {
+                            log.info("Task {} is now complete with no errors.", id);
+                        } else {
+                            log.info("Task {} is now complete with error: {}", id, task.error);
+                        }
                         task.doneMs = time.milliseconds();
                         task.state = ManagedTaskState.DONE;
                     } else {
-                        for (String workerName : activeWorkers) {
-                            nodeManagers.get(workerName).stopWorker(id);
+                        for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
+                            nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
                         }
-                        log.info("Cancelling task {} on worker(s): {}", id, Utils.join(activeWorkers, ", "));
+                        log.info("Cancelling task {} with worker(s) {}",
+                            id, Utils.mkString(activeWorkerIds, "", "", " = ", ", "));
                         task.state = ManagedTaskState.STOPPING;
                     }
                     break;
@@ -440,7 +470,48 @@ public final class TaskManager {
                     log.info("Can't cancel task {} because it is already done.", id);
                     break;
             }
-            return task.spec;
+            return null;
+        }
+    }
+
+    public void destroyTask(String id) throws Throwable {
+        try {
+            executor.submit(new DestroyTask(id)).get();
+        } catch (ExecutionException e) {
+            log.info("destroyTask(id={}) error", id, e);
+            throw e.getCause();
+        }
+    }
+
+    /**
+     * Handles destroying a task.  Processed by the state change thread.
+     */
+    class DestroyTask implements Callable<Void> {
+        private final String id;
+
+        DestroyTask(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            if (id.isEmpty()) {
+                throw new InvalidRequestException("Invalid empty ID in destroyTask request.");
+            }
+            ManagedTask task = tasks.remove(id);
+            if (task == null) {
+                log.info("Can't destroy task {}: no such task found.", id);
+                return null;
+            }
+            log.info("Destroying task {}.", id);
+            task.clearStartFuture();
+            for (Map.Entry<String, Long> entry : task.workerIds.entrySet()) {
+                long workerId = entry.getValue();
+                workerStates.remove(workerId);
+                String nodeName = entry.getKey();
+                nodeManagers.get(nodeName).destroyWorker(workerId);
+            }
+            return null;
         }
     }
 
@@ -448,38 +519,48 @@ public final class TaskManager {
      * Update the state of a particular agent's worker.
      *
      * @param nodeName      The node where the agent is running.
-     * @param id            The worker name.
+     * @param workerId      The worker ID.
      * @param state         The worker state.
      */
-    public void updateWorkerState(String nodeName, String id, WorkerState state) {
-        executor.submit(new UpdateWorkerState(nodeName, id, state));
+    public void updateWorkerState(String nodeName, long workerId, WorkerState state) {
+        executor.submit(new UpdateWorkerState(nodeName, workerId, state));
     }
 
+    /**
+     * Updates the state of a worker.  Process by the state change thread.
+     */
     class UpdateWorkerState implements Callable<Void> {
         private final String nodeName;
-        private final String id;
-        private final WorkerState state;
+        private final long workerId;
+        private final WorkerState nextState;
 
-        UpdateWorkerState(String nodeName, String id, WorkerState state) {
+        UpdateWorkerState(String nodeName, long workerId, WorkerState nextState) {
             this.nodeName = nodeName;
-            this.id = id;
-            this.state = state;
+            this.workerId = workerId;
+            this.nextState = nextState;
         }
 
         @Override
         public Void call() throws Exception {
-            ManagedTask task = tasks.get(id);
-            if (task == null) {
-                log.error("Can't update worker state unknown worker {} on node {}",
-                    id, nodeName);
-                return null;
-            }
-            WorkerState prevState = task.workerStates.get(nodeName);
-            log.debug("Task {}: Updating worker state for {} from {} to {}.",
-                id, nodeName, prevState, state);
-            task.workerStates.put(nodeName, state);
-            if (state.done() && (!prevState.done())) {
-                handleWorkerCompletion(task, nodeName, (WorkerDone) state);
+            try {
+                WorkerState prevState = workerStates.get(workerId);
+                if (prevState == null) {
+                    throw new RuntimeException("Unable to find workerId " + workerId);
+                }
+                ManagedTask task = tasks.get(prevState.taskId());
+                if (task == null) {
+                    throw new RuntimeException("Unable to find taskId " + prevState.taskId());
+                }
+                log.debug("Task {}: Updating worker state for {} on {} from {} to {}.",
+                    task.id, workerId, nodeName, prevState, nextState);
+                workerStates.put(workerId, nextState);
+                if (nextState.done() && (!prevState.done())) {
+                    handleWorkerCompletion(task, nodeName, (WorkerDone) nextState);
+                }
+            } catch (Exception e) {
+                log.error("Error updating worker state for {} on {}.  Stopping worker.",
+                    workerId, nodeName, e);
+                nodeManagers.get(nodeName).stopWorker(workerId);
             }
             return null;
         }
@@ -501,19 +582,19 @@ public final class TaskManager {
                 nodeName, task.id, state.error(), JsonUtil.toJsonString(state.status()));
             task.maybeSetError(state.error());
         }
-        if (task.activeWorkers().isEmpty()) {
+        TreeMap<String, Long> activeWorkerIds = task.activeWorkerIds();
+        if (activeWorkerIds.isEmpty()) {
             task.doneMs = time.milliseconds();
             task.state = ManagedTaskState.DONE;
             log.info("{}: Task {} is now complete on {} with error: {}",
-                nodeName, task.id, Utils.join(task.workerStates.keySet(), ", "),
+                nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "),
                 task.error.isEmpty() ? "(none)" : task.error);
         } else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
-            TreeSet<String> activeWorkers = task.activeWorkers();
             log.info("{}: task {} stopped with error {}.  Stopping worker(s): {}",
-                nodeName, task.id, task.error, Utils.join(activeWorkers, ", "));
+                nodeName, task.id, task.error, Utils.mkString(activeWorkerIds, "{", "}", ": ", ", "));
             task.state = ManagedTaskState.STOPPING;
-            for (String workerName : activeWorkers) {
-                nodeManagers.get(workerName).stopWorker(task.id);
+            for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
+                nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
             }
         }
     }
@@ -525,6 +606,9 @@ public final class TaskManager {
         return executor.submit(new GetTasksResponse(request)).get();
     }
 
+    /**
+     * Gets information about the tasks being managed.  Processed by the state change thread.
+     */
     class GetTasksResponse implements Callable<TasksResponse> {
         private final TasksRequest request;
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
index c505e75..d41a54b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
@@ -27,13 +27,13 @@ import java.util.TreeMap;
  */
 public class AgentStatusResponse extends Message {
     private final long serverStartMs;
-    private final TreeMap<String, WorkerState> workers;
+    private final TreeMap<Long, WorkerState> workers;
 
     @JsonCreator
     public AgentStatusResponse(@JsonProperty("serverStartMs") long serverStartMs,
-            @JsonProperty("workers") TreeMap<String, WorkerState> workers) {
+            @JsonProperty("workers") TreeMap<Long, WorkerState> workers) {
         this.serverStartMs = serverStartMs;
-        this.workers = workers == null ? new TreeMap<String, WorkerState>() : workers;
+        this.workers = workers == null ? new TreeMap<Long, WorkerState>() : workers;
     }
 
     @JsonProperty
@@ -42,7 +42,7 @@ public class AgentStatusResponse extends Message {
     }
 
     @JsonProperty
-    public TreeMap<String, WorkerState> workers() {
+    public TreeMap<Long, WorkerState> workers() {
         return workers;
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
deleted file mode 100644
index 54ea0f2..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
-
-/**
- * A response from the Trogdor coordinator about creating a task.
- */
-public class CreateTaskResponse extends Message {
-    private final TaskSpec spec;
-
-    @JsonCreator
-    public CreateTaskResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
-    }
-
-    @JsonProperty
-    public TaskSpec spec() {
-        return spec;
-    }
-}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
index 9f6e8dc..4acc943 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
@@ -25,19 +25,27 @@ import org.apache.kafka.trogdor.task.TaskSpec;
  * A request to the Trogdor agent to create a worker.
  */
 public class CreateWorkerRequest extends Message {
-    private final String id;
+    private final long workerId;
+    private final String taskId;
     private final TaskSpec spec;
 
     @JsonCreator
-    public CreateWorkerRequest(@JsonProperty("id") String id,
+    public CreateWorkerRequest(@JsonProperty("workerId") long workerId,
+            @JsonProperty("taskId") String taskId,
             @JsonProperty("spec") TaskSpec spec) {
-        this.id = id;
+        this.workerId = workerId;
+        this.taskId = taskId;
         this.spec = spec;
     }
 
     @JsonProperty
-    public String id() {
-        return id;
+    public long workerId() {
+        return workerId;
+    }
+
+    @JsonProperty
+    public String taskId() {
+        return taskId;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
deleted file mode 100644
index 9e068ec..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
-
-/**
- * A response from the Trogdor agent about creating a worker.
- */
-public class CreateWorkerResponse extends Message {
-    private final TaskSpec spec;
-
-    @JsonCreator
-    public CreateWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
-    }
-
-    @JsonProperty
-    public TaskSpec spec() {
-        return spec;
-    }
-}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
similarity index 74%
rename from tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
rename to tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
index 7d5b468..d782d5d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
@@ -19,21 +19,20 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
- * A response from the Trogdor agent about stopping a worker.
+ * A request to the Trogdor coordinator to delete all memory of a task.
  */
-public class StopWorkerResponse extends Message {
-    private final TaskSpec spec;
+public class DestroyTaskRequest extends Message {
+    private final String id;
 
     @JsonCreator
-    public StopWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
+    public DestroyTaskRequest(@JsonProperty("id") String id) {
+        this.id = id;
     }
 
     @JsonProperty
-    public TaskSpec spec() {
-        return spec;
+    public String id() {
+        return id;
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
similarity index 75%
rename from tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
rename to tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
index f344dc9..e5a8969 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
@@ -19,21 +19,20 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
 
 /**
- * A response from the Trogdor coordinator about stopping a task.
+ * A request to the Trogdor agent to delete all memory of a task.
  */
-public class StopTaskResponse extends Message {
-    private final TaskSpec spec;
+public class DestroyWorkerRequest extends Message {
+    private final long workerId;
 
     @JsonCreator
-    public StopTaskResponse(@JsonProperty("spec") TaskSpec spec) {
-        this.spec = spec;
+    public DestroyWorkerRequest(@JsonProperty("workerId") long workerId) {
+        this.workerId = workerId;
     }
 
     @JsonProperty
-    public TaskSpec spec() {
-        return spec;
+    public long workerId() {
+        return workerId;
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
similarity index 68%
copy from tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
copy to tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
index 3287801..2701f6a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
@@ -17,22 +17,17 @@
 
 package org.apache.kafka.trogdor.rest;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
 /**
- * A request to the Trogdor agent to stop a task.
+ * Indicates that a given request got an HTTP error 409: CONFLICT.
  */
-public class StopTaskRequest extends Message {
-    private final String id;
+public class RequestConflictException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
 
-    @JsonCreator
-    public StopTaskRequest(@JsonProperty("id") String id) {
-        this.id = id;
+    public RequestConflictException(String message) {
+        super(message);
     }
 
-    @JsonProperty
-    public String id() {
-        return id;
+    public RequestConflictException() {
+        super();
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
index f62a775..57c54ec 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
@@ -18,6 +18,7 @@ package org.apache.kafka.trogdor.rest;
 
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,8 @@ public class RestExceptionMapper implements ExceptionMapper<Throwable> {
         }
         if (e instanceof NotFoundException) {
             return buildResponse(Response.Status.NOT_FOUND, e);
+        } else if (e instanceof InvalidRequestException) {
+            return buildResponse(Response.Status.BAD_REQUEST, e);
         } else if (e instanceof InvalidTypeIdException) {
             return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
         } else if (e instanceof JsonMappingException) {
@@ -46,6 +49,8 @@ public class RestExceptionMapper implements ExceptionMapper<Throwable> {
             return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
         } else if (e instanceof SerializationException) {
             return buildResponse(Response.Status.BAD_REQUEST, e);
+        } else if (e instanceof RequestConflictException) {
+            return buildResponse(Response.Status.CONFLICT, e);
         } else {
             return buildResponse(Response.Status.INTERNAL_SERVER_ERROR, e);
         }
@@ -57,7 +62,9 @@ public class RestExceptionMapper implements ExceptionMapper<Throwable> {
         } else if (code == Response.Status.NOT_IMPLEMENTED.getStatusCode()) {
             throw new ClassNotFoundException(msg);
         } else if (code == Response.Status.BAD_REQUEST.getStatusCode()) {
-            throw new SerializationException(msg);
+            throw new InvalidRequestException(msg);
+        } else if (code == Response.Status.CONFLICT.getStatusCode()) {
+            throw new RequestConflictException(msg);
         } else {
             throw new RuntimeException(msg);
         }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
index 3287801..704a961 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
@@ -28,7 +28,7 @@ public class StopTaskRequest extends Message {
 
     @JsonCreator
     public StopTaskRequest(@JsonProperty("id") String id) {
-        this.id = id;
+        this.id = (id == null) ? "" : id;
     }
 
     @JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
index 54c689a..c1dcff3 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
@@ -24,15 +24,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  * A request to the Trogdor agent to stop a worker.
  */
 public class StopWorkerRequest extends Message {
-    private final String id;
+    private final long workerId;
 
     @JsonCreator
-    public StopWorkerRequest(@JsonProperty("id") String id) {
-        this.id = id;
+    public StopWorkerRequest(@JsonProperty("workerId") long workerId) {
+        this.workerId = workerId;
     }
 
     @JsonProperty
-    public String id() {
-        return id;
+    public long workerId() {
+        return workerId;
     }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
index 500d3c6..5f773bb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -49,12 +49,13 @@ public class WorkerDone extends WorkerState {
     private final String error;
 
     @JsonCreator
-    public WorkerDone(@JsonProperty("spec") TaskSpec spec,
+    public WorkerDone(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("doneMs") long doneMs,
             @JsonProperty("status") JsonNode status,
             @JsonProperty("error") String error) {
-        super(spec);
+        super(taskId, spec);
         this.startedMs = startedMs;
         this.doneMs = doneMs;
         this.status = status == null ? NullNode.instance : status;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
index 7068774..1babcce 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -29,8 +29,9 @@ import org.apache.kafka.trogdor.task.TaskSpec;
  */
 public final class WorkerReceiving extends WorkerState {
     @JsonCreator
-    public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
+    public WorkerReceiving(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec) {
+        super(taskId, spec);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
index af8ee88..15e7752 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -39,10 +39,11 @@ public class WorkerRunning extends WorkerState {
     private final JsonNode status;
 
     @JsonCreator
-    public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
+    public WorkerRunning(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("status") JsonNode status) {
-        super(spec);
+        super(taskId, spec);
         this.startedMs = startedMs;
         this.status = status == null ? NullNode.instance : status;
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
index b568ec1..7a06eac 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
@@ -28,8 +28,9 @@ import org.apache.kafka.trogdor.task.TaskSpec;
  */
 public final class WorkerStarting extends WorkerState {
     @JsonCreator
-    public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
-        super(spec);
+    public WorkerStarting(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec) {
+        super(taskId, spec);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
index 044d719..6480a24 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
@@ -38,13 +38,20 @@ import org.apache.kafka.trogdor.task.TaskSpec;
     @JsonSubTypes.Type(value = WorkerDone.class, name = "DONE")
     })
 public abstract class WorkerState extends Message {
+    private final String taskId;
     private final TaskSpec spec;
 
-    public WorkerState(TaskSpec spec) {
+    public WorkerState(String taskId, TaskSpec spec) {
+        this.taskId = taskId;
         this.spec = spec;
     }
 
     @JsonProperty
+    public String taskId() {
+        return taskId;
+    }
+
+    @JsonProperty
     public TaskSpec spec() {
         return spec;
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
index 9fbb3ff..2942e11 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -39,10 +39,11 @@ public class WorkerStopping extends WorkerState {
     private final JsonNode status;
 
     @JsonCreator
-    public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
+    public WorkerStopping(@JsonProperty("taskId") String taskId,
+            @JsonProperty("spec") TaskSpec spec,
             @JsonProperty("startedMs") long startedMs,
             @JsonProperty("status") JsonNode status) {
-        super(spec);
+        super(taskId, spec);
         this.startedMs = startedMs;
         this.status = status == null ? NullNode.instance : status;
     }
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 61de5c9..158e690 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -36,8 +36,9 @@ import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
@@ -120,36 +121,47 @@ public class AgentTest {
         new ExpectedTasks().waitFor(client);
 
         final NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 600000);
-        CreateWorkerResponse response = client.createWorker(new CreateWorkerRequest("foo", fooSpec));
-        assertEquals(fooSpec.toString(), response.spec().toString());
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
+
+        try {
+            client.createWorker(new CreateWorkerRequest(1, "foo", barSpec));
+            Assert.fail("Expected RequestConflictException when re-creating a request with a different taskId.");
+        } catch (RequestConflictException exception) {
+        }
+        try {
+            client.createWorker(new CreateWorkerRequest(1, "bar", fooSpec));
+            Assert.fail("Expected RequestConflictException when re-creating a request with a different spec.");
+        } catch (RequestConflictException exception) {
+        }
+
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         final NoOpTaskSpec bazSpec = new NoOpTaskSpec(1, 450000);
-        client.createWorker(new CreateWorkerRequest("baz", bazSpec));
+        client.createWorker(new CreateWorkerRequest(2, "baz", bazSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("baz").
-                workerState(new WorkerRunning(bazSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("baz", bazSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -167,23 +179,23 @@ public class AgentTest {
         new ExpectedTasks().waitFor(client);
 
         final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2);
-        client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(1);
 
         final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
                 build()).
             waitFor(client);
 
@@ -191,21 +203,21 @@ public class AgentTest {
 
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
+                workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
+                workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         time.sleep(5);
-        client.stopWorker(new StopWorkerRequest("bar"));
+        client.stopWorker(new StopWorkerRequest(1));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
+                workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 1, 7, new TextNode("done"), "")).
+                workerState(new WorkerDone("bar", barSpec, 1, 7, new TextNode("done"), "")).
                 build()).
             waitFor(client);
 
@@ -224,25 +236,25 @@ public class AgentTest {
 
         SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
             Collections.singletonMap("node01", 1L), "");
-        client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
                 build()).
             waitFor(client);
 
         SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
             Collections.singletonMap("node01", 2L), "baz");
-        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
 
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1,
+                workerState(new WorkerDone("foo", fooSpec, 0, 1,
                     new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerRunning(barSpec, 0,
+                workerState(new WorkerRunning("bar", barSpec, 0,
                     new TextNode("active"))).
                 build()).
             waitFor(client);
@@ -250,11 +262,11 @@ public class AgentTest {
         time.sleep(1);
         new ExpectedTasks().
             addTask(new ExpectedTaskBuilder("foo").
-                workerState(new WorkerDone(fooSpec, 0, 1,
+                workerState(new WorkerDone("foo", fooSpec, 0, 1,
                     new TextNode("halted"), "")).
                 build()).
             addTask(new ExpectedTaskBuilder("bar").
-                workerState(new WorkerDone(barSpec, 0, 2,
+                workerState(new WorkerDone("bar", barSpec, 0, 2,
                     new TextNode("halted"), "baz")).
                 build()).
             waitFor(client);
@@ -293,37 +305,84 @@ public class AgentTest {
             Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
             FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
                 Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/foo", 123);
-            client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+            client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("Added fault foo"))).
                     build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
                 new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
             FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
                 Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/bar", 456);
-            client.createWorker(new CreateWorkerRequest("bar", barSpec));
+            client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).build()).
+                    workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("Added fault foo"))).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
+                    workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{
                     add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
                     add(new KiboshFilesUnreadableFaultSpec("/bar", 456));
                 }}), mockKibosh.read());
             time.sleep(1);
-            client.stopWorker(new StopWorkerRequest("foo"));
+            client.stopWorker(new StopWorkerRequest(0));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
-                    workerState(new WorkerDone(fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()).
+                    workerState(new WorkerDone("foo", fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()).
                 addTask(new ExpectedTaskBuilder("bar").
-                    workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
+                    workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("Added fault bar"))).build()).
                 waitFor(client);
             Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
                 new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());
         }
     }
+
+    @Test
+    public void testDestroyWorkers() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
+        new ExpectedTasks().waitFor(client);
+
+        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 5);
+        client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
+                build()).
+            waitFor(client);
+        time.sleep(1);
+
+        client.destroyWorker(new DestroyWorkerRequest(0));
+        client.destroyWorker(new DestroyWorkerRequest(0));
+        client.destroyWorker(new DestroyWorkerRequest(1));
+        new ExpectedTasks().waitFor(client);
+        time.sleep(1);
+
+        final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(100, 1);
+        client.createWorker(new CreateWorkerRequest(1, "foo", fooSpec2));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning("foo", fooSpec2, 2, new TextNode("active"))).
+                build()).
+            waitFor(client);
+
+        time.sleep(2);
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerDone("foo", fooSpec2, 2, 4, new TextNode("done"), "")).
+                build()).
+            waitFor(client);
+
+        time.sleep(1);
+        client.destroyWorker(new DestroyWorkerRequest(1));
+        new ExpectedTasks().waitFor(client);
+
+        agent.beginShutdown();
+        agent.waitForShutdown();
+    }
 };
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index 617bf34..121281f 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -32,6 +32,7 @@ import org.apache.kafka.trogdor.task.TaskSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -184,10 +185,14 @@ public class ExpectedTasks {
                     throw new RuntimeException(e);
                 }
                 StringBuilder errors = new StringBuilder();
+                HashMap<String, WorkerState> taskIdToWorkerState = new HashMap<>();
+                for (WorkerState state : status.workers().values()) {
+                    taskIdToWorkerState.put(state.taskId(), state);
+                }
                 for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
                     String id = entry.getKey();
                     ExpectedTask worker = entry.getValue();
-                    String differences = worker.compare(status.workers().get(id));
+                    String differences = worker.compare(taskIdToWorkerState.get(id));
                     if (differences != null) {
                         errors.append(differences);
                     }
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 8101d9c..c1f7490 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -45,9 +45,9 @@ public class JsonSerializationTest {
         verify(new ProcessStopFaultSpec(0, 0, null, null));
         verify(new AgentStatusResponse(0, null));
         verify(new TasksResponse(null));
-        verify(new WorkerDone(null, 0, 0, null, null));
-        verify(new WorkerRunning(null, 0, null));
-        verify(new WorkerStopping(null, 0, null));
+        verify(new WorkerDone(null, null, 0, 0, null, null));
+        verify(new WorkerRunning(null, null, 0, null));
+        verify(new WorkerStopping(null, null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
             0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index 07f02c5..46315c2 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -185,7 +185,7 @@ public class MiniTrogdorCluster implements AutoCloseable {
                             }
                             if (node.coordinatorRestResource != null) {
                                 node.coordinator = new Coordinator(node.platform, scheduler,
-                                    node.coordinatorRestServer, node.coordinatorRestResource);
+                                    node.coordinatorRestServer, node.coordinatorRestResource, 0);
                             }
                         } catch (Exception e) {
                             log.error("Unable to initialize {}", nodeName, e);
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 34d7ffe..e943484 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
 import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
@@ -57,8 +59,9 @@ import java.util.HashMap;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class CoordinatorTest {
     private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
@@ -96,11 +99,25 @@ public class CoordinatorTest {
                     build()).
                 waitFor(cluster.coordinatorClient());
 
+            // Re-creating a task with the same arguments is not an error.
+            cluster.coordinatorClient().createTask(
+                new CreateTaskRequest("foo", fooSpec));
+
+            // Re-creating a task with different arguments gives a RequestConflictException.
+            try {
+                NoOpTaskSpec barSpec = new NoOpTaskSpec(1000, 2000);
+                cluster.coordinatorClient().createTask(
+                    new CreateTaskRequest("foo", barSpec));
+                fail("Expected to get an exception when re-creating a task with a " +
+                    "different task spec.");
+            } catch (RequestConflictException exception) {
+            }
+
             time.sleep(2);
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
-                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
                     build()).
                 waitFor(cluster.coordinatorClient()).
                 waitFor(cluster.agentClient("node02"));
@@ -149,7 +166,7 @@ public class CoordinatorTest {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 11, status1)).
-                    workerState(new WorkerRunning(fooSpec, 11,  new TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 11,  new TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -163,7 +180,7 @@ public class CoordinatorTest {
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskDone(fooSpec, 11, 13,
                         "", false, status2)).
-                    workerState(new WorkerDone(fooSpec, 11, 13, new TextNode("done"), "")).
+                    workerState(new WorkerDone("foo", fooSpec, 11, 13, new TextNode("done"), "")).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -206,7 +223,7 @@ public class CoordinatorTest {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 11, status1)).
-                    workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 11, new TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
@@ -221,11 +238,68 @@ public class CoordinatorTest {
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskDone(fooSpec, 11, 12, "",
                         true, status2)).
-                    workerState(new WorkerDone(fooSpec, 11, 12, new TextNode("done"), "")).
+                    workerState(new WorkerDone("foo", fooSpec, 11, 12, new TextNode("done"), "")).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+        }
+    }
+
+    @Test
+    public void testTaskDestruction() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            AgentClient agentClient1 = cluster.agentClient("node01");
+            AgentClient agentClient2 = cluster.agentClient("node02");
+
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 2);
+            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+            coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+            NoOpTaskSpec barSpec = new NoOpTaskSpec(20, 20);
+            coordinatorClient.createTask(new CreateTaskRequest("bar", barSpec));
+            coordinatorClient.destroyTask(new DestroyTaskRequest("bar"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+            time.sleep(10);
+
+            ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+            status1.set("node01", new TextNode("active"));
+            status1.set("node02", new TextNode("active"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 10, status1)).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
+
+            coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
         }
     }
 
@@ -397,7 +471,7 @@ public class CoordinatorTest {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
-                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
                     build()).
                 addTask(new ExpectedTaskBuilder("bar").
                     taskState(new TaskPending(barSpec)).
@@ -448,7 +522,7 @@ public class CoordinatorTest {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, status1)).
-                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(cluster.agentClient("node02")).
@@ -461,14 +535,14 @@ public class CoordinatorTest {
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, status2)).
-                    workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
                     build()).
                 waitFor(coordinatorClient).
                 waitFor(cluster.agentClient("node03"));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
                     taskState(new TaskRunning(fooSpec, 2, status2)).
-                    workerState(new WorkerDone(fooSpec, 2, 12, new TextNode("halted"), "")).
+                    workerState(new WorkerDone("foo", fooSpec, 2, 12, new TextNode("halted"), "")).
                     build()).
                 waitFor(cluster.agentClient("node02"));
 
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java b/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
index c40f958..9c7f752 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.Response;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.junit.Test;
 
@@ -68,6 +70,13 @@ public class RestExceptionMapperTest {
     }
 
     @Test
+    public void testToResponseInvalidRequestException() {
+        RestExceptionMapper mapper = new RestExceptionMapper();
+        Response resp = mapper.toResponse(new InvalidRequestException("invalid request"));
+        assertEquals(resp.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
+    }
+
+    @Test
     public void testToResponseUnknownException() {
         RestExceptionMapper mapper = new RestExceptionMapper();
         Response resp = mapper.toResponse(new Exception("Unkown exception"));
@@ -84,7 +93,7 @@ public class RestExceptionMapperTest {
         RestExceptionMapper.toException(Response.Status.NOT_IMPLEMENTED.getStatusCode(), "Not Implemented");
     }
 
-    @Test(expected = SerializationException.class)
+    @Test(expected = InvalidRequestException.class)
     public void testToExceptionSerializationException() throws Exception {
         RestExceptionMapper.toException(Response.Status.BAD_REQUEST.getStatusCode(), "Bad Request");
     }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message