kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [1/4] kafka git commit: KAFKA-6060; Add workload generation capabilities to Trogdor
Date Fri, 03 Nov 2017 09:38:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e4208b1d5 -> 4fac83ba1


http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
new file mode 100644
index 0000000..3d8323d
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a task which is being run by the agent.
+ */
+public class WorkerRunning extends WorkerState {
+    /**
+     * The time on the agent when the task was started.
+     */
+    private final long startedMs;
+
+    /**
+     * The task status.  The format will depend on the type of task that is
+     * being run.
+     */
+    private final String status;
+
+    @JsonCreator
+    public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("status") String status) {
+        super(spec);
+        this.startedMs = startedMs;
+        this.status = status;
+    }
+
+    @JsonProperty
+    @Override
+    public long startedMs() {
+        return startedMs;
+    }
+
+    @JsonProperty
+    @Override
+    public String status() {
+        return status;
+    }
+
+    @Override
+    public boolean running() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
new file mode 100644
index 0000000..3a766ea
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * When we have just started a worker.
+ */
+public final class WorkerStarting extends WorkerState {
+    @JsonCreator
+    public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
+        super(spec);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
new file mode 100644
index 0000000..6d7c687
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state which a worker is in on the Agent.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "state")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(value = WorkerReceiving.class, name = "RECEIVING"),
+    @JsonSubTypes.Type(value = WorkerStarting.class, name = "STARTING"),
+    @JsonSubTypes.Type(value = WorkerRunning.class, name = "RUNNING"),
+    @JsonSubTypes.Type(value = WorkerStopping.class, name = "STOPPING"),
+    @JsonSubTypes.Type(value = WorkerDone.class, name = "DONE")
+    })
+public abstract class WorkerState extends Message {
+    private final TaskSpec spec;
+
+    public WorkerState(TaskSpec spec) {
+        this.spec = spec;
+    }
+
+    @JsonProperty
+    public TaskSpec spec() {
+        return spec;
+    }
+
+    public boolean stopping() {
+        return false;
+    }
+
+    public boolean done() {
+        return false;
+    }
+
+    public long startedMs() {
+        throw new KafkaException("invalid state");
+    }
+
+    public String status() {
+        throw new KafkaException("invalid state");
+    }
+
+    public boolean running() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
new file mode 100644
index 0000000..fa2d546
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a worker which is being stopped on the agent.
+ */
+public class WorkerStopping extends WorkerState {
+    /**
+     * The time on the agent when the task was received.
+     */
+    private final long startedMs;
+
+    /**
+     * The task status.  The format will depend on the type of task that is
+     * being run.
+     */
+    private final String status;
+
+    @JsonCreator
+    public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
+            @JsonProperty("startedMs") long startedMs,
+            @JsonProperty("status") String status) {
+        super(spec);
+        this.startedMs = startedMs;
+        this.status = status;
+    }
+
+    @JsonProperty
+    @Override
+    public long startedMs() {
+        return startedMs;
+    }
+
+    @JsonProperty
+    @Override
+    public String status() {
+        return status;
+    }
+
+    @Override
+    public boolean stopping() {
+        return true;
+    }
+
+    @Override
+    public boolean running() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskController.java b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskController.java
new file mode 100644
index 0000000..b5906c3
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+public class NoOpTaskController implements TaskController {
+    private static final Logger log = LoggerFactory.getLogger(NoOpTaskController.class);
+
+    public NoOpTaskController() {
+    }
+
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        return Topology.Util.agentNodeNames(topology);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskSpec.java
new file mode 100644
index 0000000..63e6023
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskSpec.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The specification for a task that does nothing.
+ *
+ * This task type exists to test trogodor itself.
+ */
+public class NoOpTaskSpec extends TaskSpec {
+    @JsonCreator
+    public NoOpTaskSpec(@JsonProperty("startMs") long startMs,
+                         @JsonProperty("durationMs") long durationMs) {
+        super(startMs, durationMs);
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return new NoOpTaskController();
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new NoOpTaskWorker(id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
new file mode 100644
index 0000000..dfa8084
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Platform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NoOpTaskWorker implements TaskWorker {
+    private static final Logger log = LoggerFactory.getLogger(NoOpTaskWorker.class);
+
+    private final String id;
+
+    public NoOpTaskWorker(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public void start(Platform platform, AtomicReference<String> status,
+                      KafkaFutureImpl<String> errorFuture) throws Exception {
+        log.info("{}: Activating NoOpTask.", id);
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        log.info("{}: Deactivating NoOpTask.", id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/task/TaskController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskController.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskController.java
new file mode 100644
index 0000000..dbd0b09
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import org.apache.kafka.trogdor.common.Topology;
+
+import java.util.Set;
+
+/**
+ * Controls a Trogdor task.
+ */
+public interface TaskController {
+    /**
+     * Get the agent nodes which this task is targetting.
+     *
+     * @param topology      The topology to use.
+     *
+     * @return              A set of target node names.
+     */
+    Set<String> targetNodes(Topology topology);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
new file mode 100644
index 0000000..84ed75a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Objects;
+
+
+/**
+ * The specification for a task. This should be immutable and suitable for serializing and sending over the wire.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,
+              include = JsonTypeInfo.As.PROPERTY,
+              property = "class")
+public abstract class TaskSpec {
+    /**
+     * The maximum task duration.
+     *
+     * We cap the task duration at this value to avoid worrying about 64-bit overflow or floating
+     * point rounding.  (Objects serialized as JSON canonically contain only floating point numbers,
+     * because JavaScript did not support integers.)
+     */
+    public final static long MAX_TASK_DURATION_MS = 1000000000000000L;
+
+    /**
+     * When the time should start in milliseconds.
+     */
+    private final long startMs;
+
+    /**
+     * How long the task should run in milliseconds.
+     */
+    private final long durationMs;
+
+    protected TaskSpec(@JsonProperty("startMs") long startMs,
+            @JsonProperty("durationMs") long durationMs) {
+        this.startMs = startMs;
+        this.durationMs = Math.max(0, Math.min(durationMs, MAX_TASK_DURATION_MS));
+    }
+
+    /**
+     * Get the target start time of this task in ms.
+     */
+    @JsonProperty
+    public final long startMs() {
+        return startMs;
+    }
+
+    /**
+     * Get the duration of this task in ms.
+     */
+    @JsonProperty
+    public final long durationMs() {
+        return durationMs;
+    }
+
+    /**
+     * Hydrate this task on the coordinator.
+     *
+     * @param id        The task id.
+     */
+    public abstract TaskController newController(String id);
+
+    /**
+     * Hydrate this task on the agent.
+     *
+     * @param id        The worker id.
+     */
+    public abstract TaskWorker newTaskWorker(String id);
+
+    @Override
+    public final boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        return toString().equals(o.toString());
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hashCode(toString());
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtil.toJsonString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
new file mode 100644
index 0000000..288eb9c
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Platform;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The agent-side interface for implementing tasks.
+ */
+public interface TaskWorker {
+    /**
+     * Starts the TaskWorker.
+     *
+     * We do not hold any locks or block the WorkerManager state machine on this call.
+     * However, createTask requests to the agent call this function directly.
+     * Therefore, your start() implementation may take a little while, but not too long.
+     * While you can perform short blocking tasks in this function, it is better to
+     * start a background thread to do something time-consuming.
+     *
+     * If the start() function throws an exception, the Agent will assume that the TaskWorker
+     * never started.  Therefore, stop() will never be invoked.  On the other hand, if the
+     * errorFuture is completed, either by a background task or by the start function itself,
+     * the Agent will invoke the stop() method to clean up the worker.
+     *
+     *
+     * @param platform          The platform to use.
+     * @param status            The current status string.  The TaskWorker can update
+     *                          this at any time to provide an updated status.
+     * @param haltFuture        A future which the worker should complete if it halts.
+     *                          If it is completed with an empty string, that means the task
+     *                          halted with no error.  Otherwise, the string is treated as the error.
+     *                          If you start a background thread, you may pass haltFuture
+     *                          to that thread.  Then, the thread can use this future to indicate
+     *                          that the worker should be stopped.
+     *
+     * @throws Exception        If the TaskWorker failed to start.  stop() will not be invoked.
+     */
+    void start(Platform platform, AtomicReference<String> status, KafkaFutureImpl<String> haltFuture)
+        throws Exception;
+
+    /**
+     * Stops the TaskWorker.
+     *
+     * A TaskWorker may be stopped because it has run for its assigned duration, or because a
+     * request arrived instructing the Agent to stop the worker.  The TaskWorker will
+     * also be stopped if errorFuture was completed to indicate that there was an error.
+     *
+     * Regardless of why the TaskWorker was stopped, the stop() function should release all
+     * resources and stop all threads before returning.  The stop() function can block for
+     * as long as it wants.  It is run in a background thread which will not block other
+     * agent operations.  All tasks will be stopped when the Agent cleanly shuts down.
+     *
+     * @param platform          The platform to use.
+     *
+     * @throws Exception        If there was an error cleaning up the TaskWorker.
+     *                          If there is no existing TaskWorker error, the worker will be
+     *                          treated as having failed with the given error.
+     */
+    void stop(Platform platform) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 53ef849..342fefc 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -17,21 +17,25 @@
 
 package org.apache.kafka.trogdor.agent;
 
+import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.trogdor.basic.BasicNode;
 import org.apache.kafka.trogdor.basic.BasicPlatform;
 import org.apache.kafka.trogdor.basic.BasicTopology;
-import org.apache.kafka.trogdor.common.ExpectedFaults;
+import org.apache.kafka.trogdor.common.ExpectedTasks;
+import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
 import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.fault.DoneState;
-import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
-import org.apache.kafka.trogdor.fault.RunningState;
-import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
 
+import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerRunning;
+import org.apache.kafka.trogdor.task.NoOpTaskSpec;
+import org.apache.kafka.trogdor.task.SampleTaskSpec;
 import org.junit.Rule;
 import org.junit.rules.Timeout;
 import org.junit.Test;
@@ -46,73 +50,91 @@ public class AgentTest {
     @Rule
     final public Timeout globalTimeout = Timeout.millis(120000);
 
-    private static BasicPlatform createBasicPlatform() {
+    private static BasicPlatform createBasicPlatform(Scheduler scheduler) {
         TreeMap<String, Node> nodes = new TreeMap<>();
         HashMap<String, String> config = new HashMap<>();
         nodes.put("node01", new BasicNode("node01", "localhost",
             config, Collections.<String>emptySet()));
         BasicTopology topology = new BasicTopology(nodes);
-        return new BasicPlatform("node01", topology, new BasicPlatform.ShellCommandRunner());
+        return new BasicPlatform("node01", topology,
+            scheduler, new BasicPlatform.ShellCommandRunner());
     }
 
-    private Agent createAgent(Time time) {
+    private Agent createAgent(Scheduler scheduler) {
         JsonRestServer restServer = new JsonRestServer(0);
         AgentRestResource resource = new AgentRestResource();
         restServer.start(resource);
-        return new Agent(createBasicPlatform(), time, restServer, resource);
+        return new Agent(createBasicPlatform(scheduler), scheduler,
+                restServer, resource);
     }
 
     @Test
     public void testAgentStartShutdown() throws Exception {
-        Agent agent = createAgent(Time.SYSTEM);
+        Agent agent = createAgent(Scheduler.SYSTEM);
         agent.beginShutdown();
         agent.waitForShutdown();
     }
 
     @Test
     public void testAgentProgrammaticShutdown() throws Exception {
-        Agent agent = createAgent(Time.SYSTEM);
-        AgentClient client = new AgentClient("localhost", agent.port());
+        Agent agent = createAgent(Scheduler.SYSTEM);
+        AgentClient client = new AgentClient(10, "localhost", agent.port());
         client.invokeShutdown();
         agent.waitForShutdown();
     }
 
     @Test
     public void testAgentGetStatus() throws Exception {
-        Agent agent = createAgent(Time.SYSTEM);
-        AgentClient client = new AgentClient("localhost", agent.port());
-        AgentStatusResponse status = client.getStatus();
-        assertEquals(agent.startTimeMs(), status.startTimeMs());
+        Agent agent = createAgent(Scheduler.SYSTEM);
+        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentStatusResponse status = client.status();
+        assertEquals(agent.status(), status);
         agent.beginShutdown();
         agent.waitForShutdown();
     }
 
     @Test
-    public void testAgentCreateFaults() throws Exception {
-        Time time = new MockTime(0, 0, 0);
-        Agent agent = createAgent(time);
-        AgentClient client = new AgentClient("localhost", agent.port());
-        AgentFaultsResponse faults = client.getFaults();
-        assertEquals(Collections.emptyMap(), faults.faults());
-        new ExpectedFaults().waitFor(client);
-
-        final NoOpFaultSpec fooSpec = new NoOpFaultSpec(1000, 600000);
-        client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
-        new ExpectedFaults().addFault("foo", fooSpec).waitFor(client);
-
-        final NoOpFaultSpec barSpec = new NoOpFaultSpec(2000, 900000);
-        client.putFault(new CreateAgentFaultRequest("bar", barSpec));
-        new ExpectedFaults().
-            addFault("foo", fooSpec).
-            addFault("bar", barSpec).
+    public void testAgentCreateWorkers() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentStatusResponse status = client.status();
+        assertEquals(Collections.emptyMap(), status.workers());
+        new ExpectedTasks().waitFor(client);
+
+        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 600000);
+        CreateWorkerResponse response = client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+        assertEquals(fooSpec.toString(), response.spec().toString());
+        new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning(fooSpec, 0, "")).
+                build()).
             waitFor(client);
 
-        final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 450000);
-        client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
-        new ExpectedFaults().
-            addFault("foo", fooSpec).
-            addFault("bar", barSpec).
-            addFault("baz", bazSpec).
+        final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
+        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning(fooSpec, 0, "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("bar").
+                workerState(new WorkerRunning(barSpec, 0, "")).
+                build()).
+            waitFor(client);
+
+        final NoOpTaskSpec bazSpec = new NoOpTaskSpec(1, 450000);
+        client.createWorker(new CreateWorkerRequest("baz", bazSpec));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning(fooSpec, 0, "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("bar").
+                workerState(new WorkerRunning(barSpec, 0, "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("baz").
+                workerState(new WorkerRunning(bazSpec, 0, "")).
+                build()).
             waitFor(client);
 
         agent.beginShutdown();
@@ -120,45 +142,97 @@ public class AgentTest {
     }
 
     @Test
-    public void testAgentActivatesFaults() throws Exception {
-        Time time = new MockTime(0, 0, 0);
-        Agent agent = createAgent(time);
-        AgentClient client = new AgentClient("localhost", agent.port());
-        AgentFaultsResponse faults = client.getFaults();
-        assertEquals(Collections.emptyMap(), faults.faults());
-        new ExpectedFaults().waitFor(client);
-
-        final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2);
-        client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
-        new ExpectedFaults().addFault("foo", new RunningState(0)).waitFor(client);
-
-        time.sleep(3);
-        new ExpectedFaults().addFault("foo", new DoneState(3, "")).waitFor(client);
-
-        final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3);
-        client.putFault(new CreateAgentFaultRequest("bar", barSpec));
-        new ExpectedFaults().
-            addFault("foo", new DoneState(3, "")).
-            addFault("bar", new RunningState(3)).
+    public void testAgentFinishesTasks() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        new ExpectedTasks().waitFor(client);
+
+        final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2);
+        client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning(fooSpec, 0, "")).
+                build()).
             waitFor(client);
 
-        time.sleep(4);
-        final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 2);
-        client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
-        new ExpectedFaults().
-            addFault("foo", new DoneState(3, "")).
-            addFault("bar", new DoneState(7, "")).
-            addFault("baz", new RunningState(7)).
+        time.sleep(1);
+
+        final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
+        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning(fooSpec, 0, "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("bar").
+                workerState(new WorkerRunning(barSpec, 1, "")).
+                build()).
             waitFor(client);
 
-        time.sleep(3);
-        new ExpectedFaults().
-            addFault("foo", new DoneState(3, "")).
-            addFault("bar", new DoneState(7, "")).
-            addFault("baz", new DoneState(10, "")).
+        time.sleep(1);
+
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("bar").
+                workerState(new WorkerRunning(barSpec, 1, "")).
+                build()).
+            waitFor(client);
+
+        time.sleep(5);
+        client.stopWorker(new StopWorkerRequest("bar"));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("bar").
+                workerState(new WorkerDone(barSpec, 1, 7, "", "")).
+                build()).
             waitFor(client);
 
         agent.beginShutdown();
         agent.waitForShutdown();
     }
+
+    @Test
+    public void testWorkerCompletions() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        MockScheduler scheduler = new MockScheduler(time);
+        Agent agent = createAgent(scheduler);
+        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        new ExpectedTasks().waitFor(client);
+
+        SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, 1, "");
+        client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerRunning(fooSpec, 0, "")).
+                build()).
+            waitFor(client);
+
+        SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000, 2, "baz");
+        client.createWorker(new CreateWorkerRequest("bar", barSpec));
+
+        time.sleep(1);
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("bar").
+                workerState(new WorkerRunning(barSpec, 0, "")).
+                build()).
+            waitFor(client);
+
+        time.sleep(1);
+        new ExpectedTasks().
+            addTask(new ExpectedTaskBuilder("foo").
+                workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+                build()).
+            addTask(new ExpectedTaskBuilder("bar").
+                workerState(new WorkerDone(barSpec, 0, 2, "", "baz")).
+                build()).
+            waitFor(client);
+    }
 };

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java
deleted file mode 100644
index 1fab903..0000000
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedFaults.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.common;
-
-import org.apache.kafka.test.TestCondition;
-import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.trogdor.agent.AgentClient;
-import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.FaultState;
-import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
-import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public class ExpectedFaults {
-    private static final Logger log = LoggerFactory.getLogger(ExpectedFaults.class);
-
-    private static class FaultData {
-        final FaultSpec spec;
-        final FaultState state;
-
-        FaultData(FaultSpec spec, FaultState state) {
-            this.spec = spec;
-            this.state = state;
-        }
-    }
-
-    private interface FaultFetcher {
-        TreeMap<String, FaultData> fetch() throws Exception;
-    }
-
-    private static class AgentFaultFetcher implements FaultFetcher {
-        private final AgentClient client;
-
-        AgentFaultFetcher(AgentClient client) {
-            this.client = client;
-        }
-
-        @Override
-        public TreeMap<String, FaultData> fetch() throws Exception {
-            TreeMap<String, FaultData> results = new TreeMap<>();
-            AgentFaultsResponse response = client.getFaults();
-            for (Map.Entry<String, AgentFaultsResponse.FaultData> entry :
-                    response.faults().entrySet()) {
-                results.put(entry.getKey(),
-                    new FaultData(entry.getValue().spec(), entry.getValue().state()));
-            }
-            return results;
-        }
-    }
-
-    private static class CoordinatorFaultFetcher implements FaultFetcher {
-        private final CoordinatorClient client;
-
-        CoordinatorFaultFetcher(CoordinatorClient client) {
-            this.client = client;
-        }
-
-        @Override
-        public TreeMap<String, FaultData> fetch() throws Exception {
-            TreeMap<String, FaultData> results = new TreeMap<>();
-            CoordinatorFaultsResponse response = client.getFaults();
-            for (Map.Entry<String, CoordinatorFaultsResponse.FaultData> entry :
-                response.faults().entrySet()) {
-                results.put(entry.getKey(),
-                    new FaultData(entry.getValue().spec(), entry.getValue().state()));
-            }
-            return results;
-        }
-    }
-
-    private final TreeMap<String, FaultData> expected = new TreeMap<String, FaultData>();
-
-    public ExpectedFaults addFault(String id, FaultSpec spec) {
-        expected.put(id, new FaultData(spec, null));
-        return this;
-    }
-
-    public ExpectedFaults addFault(String id, FaultState state) {
-        expected.put(id, new FaultData(null, state));
-        return this;
-    }
-
-    public ExpectedFaults addFault(String id, FaultSpec spec, FaultState state) {
-        expected.put(id, new FaultData(spec, state));
-        return this;
-    }
-
-    public ExpectedFaults waitFor(AgentClient agentClient) throws InterruptedException {
-        waitFor(new AgentFaultFetcher(agentClient));
-        return this;
-    }
-
-    public ExpectedFaults waitFor(CoordinatorClient client) throws InterruptedException {
-        waitFor(new CoordinatorFaultFetcher(client));
-        return this;
-    }
-
-    private void waitFor(final FaultFetcher faultFetcher) throws InterruptedException {
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                TreeMap<String, FaultData> curData = null;
-                try {
-                    curData = faultFetcher.fetch();
-                } catch (Exception e) {
-                    log.info("Got error fetching faults", e);
-                    throw new RuntimeException(e);
-                }
-                StringBuilder errors = new StringBuilder();
-                for (Map.Entry<String, FaultData> entry : expected.entrySet()) {
-                    String id = entry.getKey();
-                    FaultData expectedFaultData = entry.getValue();
-                    FaultData curFaultData = curData.get(id);
-                    if (curFaultData == null) {
-                        errors.append("Did not find fault id " + id + "\n");
-                    } else {
-                        if (expectedFaultData.spec != null) {
-                            if (!expectedFaultData.spec.equals(curFaultData.spec)) {
-                                errors.append("For fault id " + id + ", expected fault " +
-                                    "spec " + expectedFaultData.spec + ", but got " +
-                                    curFaultData.spec + "\n");
-                            }
-                        }
-                        if (expectedFaultData.state != null) {
-                            if (!expectedFaultData.state.equals(curFaultData.state)) {
-                                errors.append("For fault id " + id + ", expected fault " +
-                                    "state " + expectedFaultData.state + ", but got " +
-                                    curFaultData.state + "\n");
-                            }
-                        }
-                    }
-                }
-                for (String id : curData.keySet()) {
-                    if (expected.get(id) == null) {
-                        errors.append("Got unexpected fault id " + id + "\n");
-                    }
-                }
-                String errorString = errors.toString();
-                if (!errorString.isEmpty()) {
-                    log.info("EXPECTED FAULTS: {}", faultsToString(expected));
-                    log.info("ACTUAL FAULTS  : {}", faultsToString(curData));
-                    log.info(errorString);
-                    return false;
-                }
-                return true;
-            }
-        }, "Timed out waiting for expected fault specs " + faultsToString(expected));
-    }
-
-    private static String faultsToString(TreeMap<String, FaultData> faults) {
-        StringBuilder bld = new StringBuilder();
-        bld.append("{");
-        String faultsPrefix = "";
-        for (Map.Entry<String, FaultData> entry : faults.entrySet()) {
-            String id = entry.getKey();
-            bld.append(faultsPrefix).append(id).append(": {");
-            faultsPrefix = ", ";
-            String faultValuesPrefix = "";
-            FaultData faultData = entry.getValue();
-            if (faultData.spec != null) {
-                bld.append(faultValuesPrefix).append("spec: ").append(faultData.spec);
-                faultValuesPrefix = ", ";
-            }
-            if (faultData.state != null) {
-                bld.append(faultValuesPrefix).append("state: ").append(faultData.state);
-                faultValuesPrefix = ", ";
-            }
-            bld.append("}");
-        }
-        bld.append("}");
-        return bld.toString();
-    }
-};

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
new file mode 100644
index 0000000..f72779f
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.trogdor.agent.AgentClient;
+import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
+import org.apache.kafka.trogdor.rest.AgentStatusResponse;
+import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class ExpectedTasks {
+    private static final Logger log = LoggerFactory.getLogger(ExpectedTasks.class);
+
+    private final TreeMap<String, ExpectedTask> expected = new TreeMap<>();
+
+    public static class ExpectedTaskBuilder {
+        private final String id;
+        private TaskSpec taskSpec = null;
+        private TaskState taskState = null;
+        private WorkerState workerState = null;
+
+        public ExpectedTaskBuilder(String id) {
+            this.id = id;
+        }
+
+        public ExpectedTaskBuilder taskSpec(TaskSpec taskSpec) {
+            this.taskSpec = taskSpec;
+            return this;
+        }
+
+        public ExpectedTaskBuilder taskState(TaskState taskState) {
+            this.taskState = taskState;
+            return this;
+        }
+
+        public ExpectedTaskBuilder workerState(WorkerState workerState) {
+            this.workerState = workerState;
+            return this;
+        }
+
+        public ExpectedTask build() {
+            return new ExpectedTask(id, taskSpec, taskState, workerState);
+        }
+    }
+
+    static class ExpectedTask {
+        private final String id;
+        private final TaskSpec taskSpec;
+        private final TaskState taskState;
+        private final WorkerState workerState;
+
+        @JsonCreator
+        private ExpectedTask(@JsonProperty("id") String id,
+                     @JsonProperty("taskSpec") TaskSpec taskSpec,
+                     @JsonProperty("taskState") TaskState taskState,
+                     @JsonProperty("workerState") WorkerState workerState) {
+            this.id = id;
+            this.taskSpec = taskSpec;
+            this.taskState = taskState;
+            this.workerState = workerState;
+        }
+
+        String compare(TaskState actual) {
+            if (actual == null) {
+                return "Did not find task " + id + "\n";
+            }
+            if ((taskSpec != null) && (!actual.spec().equals(taskSpec))) {
+                return "Invalid spec for task " + id + ": expected " + taskSpec +
+                    ", got " + actual.spec();
+            }
+            if ((taskState != null) && (!actual.equals(taskState))) {
+                return "Invalid state for task " + id + ": expected " + taskState +
+                    ", got " + actual;
+            }
+            return null;
+        }
+
+        String compare(WorkerState actual) {
+            if ((workerState != null) && (!workerState.equals(actual))) {
+                if (actual == null) {
+                    return "Did not find worker " + id + "\n";
+                }
+                return "Invalid state for task " + id + ": expected " + workerState +
+                    ", got " + actual;
+            }
+            return null;
+        }
+
+        @JsonProperty
+        public String id() {
+            return id;
+        }
+
+        @JsonProperty
+        public TaskSpec taskSpec() {
+            return taskSpec;
+        }
+
+        @JsonProperty
+        public TaskState taskState() {
+            return taskState;
+        }
+
+        @JsonProperty
+        public WorkerState workerState() {
+            return workerState;
+        }
+    }
+
+    public ExpectedTasks addTask(ExpectedTask task) {
+        expected.put(task.id, task);
+        return this;
+    }
+
+    public ExpectedTasks waitFor(final CoordinatorClient client) throws InterruptedException {
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                TasksResponse tasks = null;
+                try {
+                    tasks = client.tasks();
+                } catch (Exception e) {
+                    log.info("Unable to get coordinator tasks", e);
+                    throw new RuntimeException(e);
+                }
+                StringBuilder errors = new StringBuilder();
+                for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
+                    String id = entry.getKey();
+                    ExpectedTask task = entry.getValue();
+                    String differences = task.compare(tasks.tasks().get(id));
+                    if (differences != null) {
+                        errors.append(differences);
+                    }
+                }
+                String errorString = errors.toString();
+                if (!errorString.isEmpty()) {
+                    log.info("EXPECTED TASKS: {}", JsonUtil.toJsonString(expected));
+                    log.info("ACTUAL TASKS  : {}", JsonUtil.toJsonString(tasks.tasks()));
+                    log.info(errorString);
+                    return false;
+                }
+                return true;
+            }
+        }, "Timed out waiting for expected tasks " + JsonUtil.toJsonString(expected));
+        return this;
+    }
+
+    public ExpectedTasks waitFor(final AgentClient client) throws InterruptedException {
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                AgentStatusResponse status = null;
+                try {
+                    status = client.status();
+                } catch (Exception e) {
+                    log.info("Unable to get agent status", e);
+                    throw new RuntimeException(e);
+                }
+                StringBuilder errors = new StringBuilder();
+                for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
+                    String id = entry.getKey();
+                    ExpectedTask worker = entry.getValue();
+                    String differences = worker.compare(status.workers().get(id));
+                    if (differences != null) {
+                        errors.append(differences);
+                    }
+                }
+                String errorString = errors.toString();
+                if (!errorString.isEmpty()) {
+                    log.info("EXPECTED WORKERS: {}", JsonUtil.toJsonString(expected));
+                    log.info("ACTUAL WORKERS  : {}", JsonUtil.toJsonString(status.workers()));
+                    log.info(errorString);
+                    return false;
+                }
+                return true;
+            }
+        }, "Timed out waiting for expected workers " + JsonUtil.toJsonString(expected));
+        return this;
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index 1947b79..b180c02 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.trogdor.common;
 
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.agent.Agent;
 import org.apache.kafka.trogdor.agent.AgentClient;
@@ -38,6 +38,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * MiniTrogdorCluster sets up a local cluster of Trogdor Agents and Coordinators.
@@ -53,7 +58,7 @@ public class MiniTrogdorCluster implements AutoCloseable {
 
         private String coordinatorName = null;
 
-        private Time time = Time.SYSTEM;
+        private Scheduler scheduler = Scheduler.SYSTEM;
 
         private BasicPlatform.CommandRunner commandRunner =
                 new BasicPlatform.ShellCommandRunner();
@@ -81,8 +86,8 @@ public class MiniTrogdorCluster implements AutoCloseable {
         /**
          * Set the timekeeper used by this MiniTrogdorCluster.
          */
-        public Builder time(Time time) {
-            this.time = time;
+        public Builder scheduler(Scheduler scheduler) {
+            this.scheduler = scheduler;
             return this;
         }
 
@@ -126,7 +131,7 @@ public class MiniTrogdorCluster implements AutoCloseable {
         /**
          * Create the MiniTrogdorCluster.
          */
-        public MiniTrogdorCluster build() {
+        public MiniTrogdorCluster build() throws Exception {
             log.info("Creating MiniTrogdorCluster with agents: {} and coordinator: {}",
                 Utils.join(agentNames, ", "), coordinatorName);
             TreeMap<String, NodeData> nodes = new TreeMap<>();
@@ -158,20 +163,41 @@ public class MiniTrogdorCluster implements AutoCloseable {
             for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
                 topologyNodes.put(entry.getKey(), entry.getValue().node);
             }
-            BasicTopology topology = new BasicTopology(topologyNodes);
-            for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
-                String nodeName = entry.getKey();
-                NodeData node = entry.getValue();
-                node.platform = new BasicPlatform(nodeName, topology, commandRunner);
-                if (node.agentRestResource != null) {
-                    node.agent = new Agent(node.platform, time, node.agentRestServer,
-                        node.agentRestResource);
-                }
-                if (node.coordinatorRestResource != null) {
-                    node.coordinator = new Coordinator(node.platform, time,
-                        node.coordinatorRestServer, node.coordinatorRestResource);
-                }
+            final BasicTopology topology = new BasicTopology(topologyNodes);
+            ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
+                ThreadUtils.createThreadFactory("MiniTrogdorClusterStartupThread%d", false));
+            final AtomicReference<Exception> failure = new AtomicReference<Exception>(null);
+            for (final Map.Entry<String, NodeData> entry : nodes.entrySet()) {
+                executor.submit(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        String nodeName = entry.getKey();
+                        try {
+                            NodeData node = entry.getValue();
+                            node.platform = new BasicPlatform(nodeName, topology, scheduler, commandRunner);
+                            if (node.agentRestResource != null) {
+                                node.agent = new Agent(node.platform, scheduler, node.agentRestServer,
+                                    node.agentRestResource);
+                            }
+                            if (node.coordinatorRestResource != null) {
+                                node.coordinator = new Coordinator(node.platform, scheduler,
+                                    node.coordinatorRestServer, node.coordinatorRestResource);
+                            }
+                        } catch (Exception e) {
+                            log.error("Unable to initialize {}", nodeName, e);
+                            failure.compareAndSet(null, e);
+                        }
+                        return null;
+                    }
+                });
+            }
+            executor.shutdown();
+            executor.awaitTermination(1, TimeUnit.DAYS);
+            Exception failureException = failure.get();
+            if (failureException != null) {
+                throw failureException;
             }
+
             TreeMap<String, Agent> agents = new TreeMap<>();
             Coordinator coordinator = null;
             for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
@@ -209,7 +235,7 @@ public class MiniTrogdorCluster implements AutoCloseable {
         if (coordinator == null) {
             throw new RuntimeException("No coordinator configured.");
         }
-        return new CoordinatorClient("localhost", coordinator.port());
+        return new CoordinatorClient(10, "localhost", coordinator.port());
     }
 
     public AgentClient agentClient(String nodeName) {
@@ -217,17 +243,18 @@ public class MiniTrogdorCluster implements AutoCloseable {
         if (agent == null) {
             throw new RuntimeException("No agent configured on node " + nodeName);
         }
-        return new AgentClient("localhost", agent.port());
+        return new AgentClient(10, "localhost", agent.port());
     }
 
     @Override
     public void close() throws Exception {
+        log.info("Closing MiniTrogdorCluster.");
+        if (coordinator != null) {
+            coordinator.beginShutdown(false);
+        }
         for (Agent agent : agents.values()) {
             agent.beginShutdown();
         }
-        if (coordinator != null) {
-            coordinator.beginShutdown();
-        }
         for (Agent agent : agents.values()) {
             agent.waitForShutdown();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 75109d2..4973823 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -17,23 +17,28 @@
 
 package org.apache.kafka.trogdor.coordinator;
 
+import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.trogdor.agent.AgentClient;
 import org.apache.kafka.trogdor.common.CapturingCommandRunner;
-import org.apache.kafka.trogdor.common.ExpectedFaults;
+import org.apache.kafka.trogdor.common.ExpectedTasks;
+import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
 import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
 
-import org.apache.kafka.trogdor.fault.DoneState;
 import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
-import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
-import org.apache.kafka.trogdor.fault.PendingState;
-import org.apache.kafka.trogdor.fault.RunningState;
 import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskDone;
+import org.apache.kafka.trogdor.rest.TaskPending;
+import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerRunning;
+import org.apache.kafka.trogdor.task.NoOpTaskSpec;
 import org.junit.Rule;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
@@ -57,62 +62,143 @@ public class CoordinatorTest {
         try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
                 addCoordinator("node01").
                 build()) {
-            CoordinatorStatusResponse status = cluster.coordinatorClient().getStatus();
-            assertEquals(cluster.coordinator().startTimeMs(), status.startTimeMs());
+            CoordinatorStatusResponse status = cluster.coordinatorClient().status();
+            assertEquals(cluster.coordinator().status(), status);
         }
     }
 
     @Test
-    public void testCreateFault() throws Exception {
-        Time time = new MockTime(0, 0, 0);
+    public void testCreateTask() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
         try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
                 addCoordinator("node01").
-                time(time).
+                addAgent("node02").
+                scheduler(scheduler).
                 build()) {
-            new ExpectedFaults().waitFor(cluster.coordinatorClient());
+            new ExpectedTasks().waitFor(cluster.coordinatorClient());
 
-            NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(1, 2);
-            cluster.coordinatorClient().putFault(
-                new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
-            new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, new PendingState()).
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 2);
+            cluster.coordinatorClient().createTask(
+                new CreateTaskRequest("foo", fooSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskPending(fooSpec)).
+                    build()).
                 waitFor(cluster.coordinatorClient());
 
             time.sleep(2);
-            new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, new DoneState(2, "")).
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2)).
+                    workerState(new WorkerRunning(fooSpec, 2, "")).
+                    build()).
+                waitFor(cluster.coordinatorClient()).
+                waitFor(cluster.agentClient("node02"));
+
+            time.sleep(3);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskDone(fooSpec, 2, 5, "", false)).
+                    build()).
                 waitFor(cluster.coordinatorClient());
         }
     }
 
     @Test
-    public void testFaultDistribution() throws Exception {
-        Time time = new MockTime(0, 0, 0);
+    public void testTaskDistribution() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
         try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
                 addCoordinator("node01").
                 addAgent("node01").
                 addAgent("node02").
-                time(time).
+                scheduler(scheduler).
                 build()) {
             CoordinatorClient coordinatorClient = cluster.coordinatorClient();
             AgentClient agentClient1 = cluster.agentClient("node01");
             AgentClient agentClient2 = cluster.agentClient("node02");
 
-            NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
-            coordinatorClient.putFault(new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
-            new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, new PendingState()).
-                waitFor(coordinatorClient);
-            new ExpectedFaults().
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
 
-            time.sleep(10);
-            new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, new DoneState(10, "")).
-                waitFor(coordinatorClient);
-            new ExpectedFaults().
-                addFault("fault1", noOpFaultSpec, new RunningState(10)).
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
+            coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            time.sleep(11);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 11)).
+                    workerState(new WorkerRunning(fooSpec, 11, "")).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            time.sleep(2);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskDone(fooSpec, 11, 13, "", false)).
+                    workerState(new WorkerDone(fooSpec, 11, 13, "", "")).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+        }
+    }
+
+    @Test
+    public void testTaskCancellation() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            AgentClient agentClient1 = cluster.agentClient("node01");
+            AgentClient agentClient2 = cluster.agentClient("node02");
+
+            new ExpectedTasks().
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
+            coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            time.sleep(11);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 11)).
+                    workerState(new WorkerRunning(fooSpec, 11, "")).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(agentClient1).
+                waitFor(agentClient2);
+
+            time.sleep(1);
+            coordinatorClient.stopTask(new StopTaskRequest("foo"));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskDone(fooSpec, 11, 12, "", true)).
+                    workerState(new WorkerDone(fooSpec, 11, 12, "", "")).
+                    build()).
+                waitFor(coordinatorClient).
                 waitFor(agentClient1).
                 waitFor(agentClient2);
         }
@@ -191,24 +277,29 @@ public class CoordinatorTest {
                     new String[] {"node01", "node02"},
                     new String[] {"node03"},
                 }));
-            coordinatorClient.putFault(new CreateCoordinatorFaultRequest("netpart", spec));
-            new ExpectedFaults().
-                addFault("netpart", spec).
+            coordinatorClient.createTask(new CreateTaskRequest("netpart", spec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("netpart").taskSpec(spec).build()).
                 waitFor(coordinatorClient);
-            new ExpectedLines().
-                addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
-                        "-m comment --comment node03").
-                waitFor("node01", runner);
-            new ExpectedLines().
-                addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
-                        "-m comment --comment node03").
-                waitFor("node02", runner);
-            new ExpectedLines().
-                addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
-                        "-m comment --comment node01").
-                addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
-                        "-m comment --comment node02").
-                waitFor("node03", runner);
+            checkLines("-A", runner);
         }
+        checkLines("-D", runner);
+    }
+
+    private void checkLines(String prefix, CapturingCommandRunner runner) throws InterruptedException {
+        new ExpectedLines().
+            addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP " +
+                "-m comment --comment node03").
+            waitFor("node01", runner);
+        new ExpectedLines().
+            addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP " +
+                "-m comment --comment node03").
+            waitFor("node02", runner);
+        new ExpectedLines().
+            addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP " +
+                "-m comment --comment node01").
+            addLine("sudo iptables " + prefix + " INPUT -p tcp -s 127.0.0.1 -j DROP " +
+                "-m comment --comment node02").
+            waitFor("node03", runner);
     }
 };

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/fault/FaultSetTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/fault/FaultSetTest.java b/tools/src/test/java/org/apache/kafka/trogdor/fault/FaultSetTest.java
deleted file mode 100644
index 5f097b6..0000000
--- a/tools/src/test/java/org/apache/kafka/trogdor/fault/FaultSetTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-public class FaultSetTest {
-    private static final NoOpFault FAULT_A =
-        new NoOpFault("faultA", new NoOpFaultSpec(0, 100));
-
-    private static final NoOpFault FAULT_B =
-        new NoOpFault("faultB", new NoOpFaultSpec(20, 60));
-
-    private static final NoOpFault FAULT_C =
-        new NoOpFault("faultC", new NoOpFaultSpec(40, 50));
-
-    private static final NoOpFault FAULT_D =
-        new NoOpFault("faultD", new NoOpFaultSpec(50, 10));
-
-    private static final List<Fault> FAULTS_IN_START_ORDER =
-        Arrays.<Fault>asList(FAULT_A, FAULT_B, FAULT_C, FAULT_D);
-
-    private static final List<Fault> FAULTS_IN_END_ORDER =
-        Arrays.<Fault>asList(FAULT_D, FAULT_B, FAULT_C, FAULT_A);
-
-    @Test
-    public void testIterateByStart() throws Exception {
-        FaultSet faultSet = new FaultSet();
-        for (Fault fault: FAULTS_IN_END_ORDER) {
-            faultSet.add(fault);
-        }
-        int i = 0;
-        for (Iterator<Fault> iter = faultSet.iterateByStart(); iter.hasNext(); ) {
-            Fault fault = iter.next();
-            assertEquals(FAULTS_IN_START_ORDER.get(i), fault);
-            i++;
-        }
-    }
-
-    @Test
-    public void testIterateByEnd() throws Exception {
-        FaultSet faultSet = new FaultSet();
-        for (Fault fault: FAULTS_IN_START_ORDER) {
-            faultSet.add(fault);
-        }
-        int i = 0;
-        for (Iterator<Fault> iter = faultSet.iterateByEnd(); iter.hasNext(); ) {
-            Fault fault = iter.next();
-            assertEquals(FAULTS_IN_END_ORDER.get(i), fault);
-            i++;
-        }
-    }
-
-    @Test
-    public void testDeletes() throws Exception {
-        FaultSet faultSet = new FaultSet();
-        for (Fault fault: FAULTS_IN_START_ORDER) {
-            faultSet.add(fault);
-        }
-        Iterator<Fault> iter = faultSet.iterateByEnd();
-        iter.next();
-        iter.next();
-        iter.remove();
-        iter.next();
-        iter.next();
-        iter.remove();
-        assertFalse(iter.hasNext());
-        try {
-            iter.next();
-            fail("expected NoSuchElementException");
-        } catch (NoSuchElementException e) {
-        }
-        iter = faultSet.iterateByEnd();
-        assertEquals(FAULT_D, iter.next());
-        assertEquals(FAULT_C, iter.next());
-        assertFalse(iter.hasNext());
-        iter = faultSet.iterateByStart();
-        faultSet.remove(FAULT_C);
-        assertEquals(FAULT_D, iter.next());
-        assertFalse(iter.hasNext());
-    }
-
-    @Test
-    public void testEqualRanges() throws Exception {
-        FaultSet faultSet = new FaultSet();
-        faultSet.add(new NoOpFault("fault1", new NoOpFaultSpec(10, 20)));
-        faultSet.add(new NoOpFault("fault2", new NoOpFaultSpec(10, 20)));
-        faultSet.add(new NoOpFault("fault3", new NoOpFaultSpec(10, 20)));
-        faultSet.add(new NoOpFault("fault4", new NoOpFaultSpec(10, 20)));
-        for (Iterator<Fault> iter = faultSet.iterateByStart(); iter.hasNext(); ) {
-            Fault fault = iter.next();
-            if (fault.id().equals("fault3")) {
-                iter.remove();
-            }
-        }
-        Iterator<Fault> iter = faultSet.iterateByStart();
-        assertEquals("fault1", iter.next().id());
-        assertEquals("fault2", iter.next().id());
-        assertEquals("fault4", iter.next().id());
-        assertFalse(iter.hasNext());
-    }
-};

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskController.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskController.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskController.java
new file mode 100644
index 0000000..2640c39
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskController.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import org.apache.kafka.trogdor.common.Topology;
+
+import java.util.Set;
+
+public class SampleTaskController implements TaskController {
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        return Topology.Util.agentNodeNames(topology);
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
new file mode 100644
index 0000000..2bbbb20
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class SampleTaskSpec extends TaskSpec {
+    private final long exitMs;
+    private final String error;
+
+    @JsonCreator
+    public SampleTaskSpec(@JsonProperty("startMs") long startMs,
+                        @JsonProperty("durationMs") long durationMs,
+                        @JsonProperty("exitMs") long exitMs,
+                        @JsonProperty("error") String error) {
+        super(startMs, durationMs);
+        this.exitMs = exitMs;
+        this.error = error;
+    }
+
+    @JsonProperty
+    public long exitMs() {
+        return exitMs;
+    }
+
+    @JsonProperty
+    public String error() {
+        return error;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return new SampleTaskController();
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new SampleTaskWorker(this);
+    }
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
new file mode 100644
index 0000000..ebac27e
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.task;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SampleTaskWorker implements TaskWorker {
+    private final SampleTaskSpec spec;
+    private final ScheduledExecutorService executor;
+    private Future<Void> future;
+
+    SampleTaskWorker(SampleTaskSpec spec) {
+        this.spec = spec;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            ThreadUtils.createThreadFactory("SampleTaskWorker", false));
+        this.future = null;
+    }
+
+    @Override
+    public synchronized void start(Platform platform, AtomicReference<String> status,
+                      final KafkaFutureImpl<String> haltFuture) throws Exception {
+        if (this.future != null)
+            return;
+        this.future = platform.scheduler().schedule(executor, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                haltFuture.complete(spec.error());
+                return null;
+            }
+        }, spec.exitMs());
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        this.future.cancel(false);
+        this.executor.shutdown();
+        this.executor.awaitTermination(1, TimeUnit.DAYS);
+    }
+};


Mime
View raw message