kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [3/4] kafka git commit: KAFKA-2369: Add REST API for Copycat.
Date Fri, 30 Oct 2015 22:00:14 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index 46c7686..4c88737 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -21,12 +21,17 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.connector.ConnectorContext;
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.NotFoundException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
 import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.copycat.sink.SinkConnector;
 import org.apache.kafka.copycat.storage.KafkaConfigStorage;
 import org.apache.kafka.copycat.util.Callback;
@@ -34,7 +39,10 @@ import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -92,30 +100,29 @@ public class DistributedHerder implements Herder, Runnable {
     private final Queue<HerderRequest> requests = new LinkedBlockingDeque<>();
     // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
-    private final Set<String> connectorConfigUpdates = new HashSet<>();
+    private Set<String> connectorConfigUpdates = new HashSet<>();
     private boolean needsReconfigRebalance;
 
-    public DistributedHerder(Worker worker, Map<String, ?> configs) {
-        this(worker, configs, null, null);
+    public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) {
+        this(config, worker, null, null, restUrl);
     }
 
     // public for testing
-    public DistributedHerder(Worker worker, Map<String, ?> configs, KafkaConfigStorage configStorage, WorkerGroupMember member) {
+    public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl) {
         this.worker = worker;
         if (configStorage != null) {
             // For testing. Assume configuration has already been performed
             this.configStorage = configStorage;
         } else {
             this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback());
-            this.configStorage.configure(configs);
+            this.configStorage.configure(config.originals());
         }
         configState = ClusterConfigState.EMPTY;
 
-        DistributedHerderConfig config = new DistributedHerderConfig(configs);
-        this.workerSyncTimeoutMs = config.getInt(DistributedHerderConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
-        this.workerUnsyncBackoffMs = config.getInt(DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
+        this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
+        this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
 
-        this.member = member != null ? member : new WorkerGroupMember(config, this.configStorage, rebalanceListener());
+        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener());
         stopping = new AtomicBoolean(false);
 
         rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
@@ -143,6 +150,9 @@ public class DistributedHerder implements Herder, Runnable {
             halt();
 
             log.info("Herder stopped");
+        } catch (Throwable t) {
+            log.error("Uncaught exception in herder work thread, exiting: ", t);
+            System.exit(1);
         } finally {
             stopLatch.countDown();
         }
@@ -169,14 +179,17 @@ public class DistributedHerder implements Herder, Runnable {
         // Process any external requests
         while (!requests.isEmpty()) {
             HerderRequest request = requests.poll();
+            Callback<Void> cb = request.callback();
             try {
-                request.callback().onCompletion(null, request.action().call());
+                request.action().call();
+                cb.onCompletion(null, null);
             } catch (Throwable t) {
-                request.callback().onCompletion(t, null);
+                cb.onCompletion(t, null);
             }
         }
 
         // Process any configuration updates
+        Set<String> connectorConfigUpdatesCopy = null;
         synchronized (this) {
             if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) {
                 // Connector reconfigs only need local updates since there is no coordination between workers required.
@@ -196,21 +209,31 @@ public class DistributedHerder implements Herder, Runnable {
                     needsReconfigRebalance = false;
                     return;
                 } else if (!connectorConfigUpdates.isEmpty()) {
-                    // If we only have connector config updates, we can just bounce the updated connectors that are
-                    // currently assigned to this worker.
-                    Set<String> localConnectors = worker.connectorNames();
-                    for (String connectorName : connectorConfigUpdates) {
-                        if (!localConnectors.contains(connectorName))
-                            continue;
-                        worker.stopConnector(connectorName);
-                        // The update may be a deletion, so verify we actually need to restart the connector
-                        if (configState.connectors().contains(connectorName))
-                            startConnector(connectorName);
-                    }
-                    connectorConfigUpdates.clear();
+                    // We can't start/stop while locked since starting connectors can cause task updates that will
+                    // require writing configs, which in turn make callbacks into this class from another thread that
+                    // require acquiring a lock. This leads to deadlock. Instead, just copy the info we need and process
+                    // the updates after unlocking.
+                    connectorConfigUpdatesCopy = connectorConfigUpdates;
+                    connectorConfigUpdates = new HashSet<>();
                 }
             }
         }
+        if (connectorConfigUpdatesCopy != null) {
+            // If we only have connector config updates, we can just bounce the updated connectors that are
+            // currently assigned to this worker.
+            Set<String> localConnectors = assignment == null ? Collections.<String>emptySet() : new HashSet<>(assignment.connectors());
+            for (String connectorName : connectorConfigUpdatesCopy) {
+                if (!localConnectors.contains(connectorName))
+                    continue;
+                boolean remains = configState.connectors().contains(connectorName);
+                log.info("Handling connector-only config update by {} connector {}",
+                        remains ? "restarting" : "stopping", connectorName);
+                worker.stopConnector(connectorName);
+                // The update may be a deletion, so verify we actually need to restart the connector
+                if (remains)
+                    startConnector(connectorName);
+            }
+        }
 
         // Let the group take any actions it needs to
         try {
@@ -272,69 +295,152 @@ public class DistributedHerder implements Herder, Runnable {
     }
 
     @Override
-    public synchronized void addConnector(final Map<String, String> connectorProps,
-                                          final Callback<String> callback) {
-        final ConnectorConfig connConfig;
-        final String connName;
-        try {
-            connConfig = new ConnectorConfig(connectorProps);
-            connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        } catch (Throwable t) {
-            if (callback != null)
-                callback.onCompletion(t, null);
-            return;
-        }
-
-        log.debug("Submitting connector config {}", connName);
+    public synchronized void connectors(final Callback<Collection<String>> callback) {
+        log.trace("Submitting connector listing request");
 
         requests.add(new HerderRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
-                        if (!isLeader())
-                            throw new NotLeaderException("Only the leader can add connectors.");
+                        if (!checkConfigSynced(callback))
+                            return null;
 
-                        log.debug("Submitting connector config {}", connName);
-                        configStorage.putConnectorConfig(connName, connectorProps);
+                        callback.onCompletion(null, configState.connectors());
+                        return null;
+                    }
+                }
+        ));
+        member.wakeup();
+    }
 
+    @Override
+    public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
+        log.trace("Submitting connector info request {}", connName);
+
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!checkConfigSynced(callback))
+                            return null;
+
+                        if (!configState.connectors().contains(connName)) {
+                            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
+                        } else {
+                            callback.onCompletion(null, new ConnectorInfo(connName, configState.connectorConfig(connName), configState.tasks(connName)));
+                        }
                         return null;
                     }
-                },
-                new Callback<Void>() {
+                }
+        ));
+        member.wakeup();
+    }
+
+    @Override
+    public void connectorConfig(String connName, final Callback<Map<String, String>> callback) {
+        // Subset of connectorInfo, so piggy back on that implementation
+        log.trace("Submitting connector config read request {}", connName);
+        connectorInfo(connName, new Callback<ConnectorInfo>() {
+            @Override
+            public void onCompletion(Throwable error, ConnectorInfo result) {
+                if (error != null)
+                    callback.onCompletion(error, null);
+                else
+                    callback.onCompletion(null, result.config());
+            }
+        });
+    }
+
+    @Override
+    public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace,
+                                   final Callback<Created<ConnectorInfo>> callback) {
+        final Map<String, String> connConfig;
+        if (config == null) {
+            connConfig = null;
+        } else if (!config.containsKey(ConnectorConfig.NAME_CONFIG)) {
+            connConfig = new HashMap<>(config);
+            connConfig.put(ConnectorConfig.NAME_CONFIG, connName);
+        } else {
+            connConfig = config;
+        }
+
+        log.trace("Submitting connector config write request {}", connName);
+
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
                     @Override
-                    public void onCompletion(Throwable error, Void result) {
-                        if (callback == null) return;
+                    public Void call() throws Exception {
+                        log.trace("Handling connector config request {}", connName);
+                        if (!isLeader()) {
+                            callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
+                            return null;
+                        }
+
+                        boolean exists = configState.connectors().contains(connName);
+                        if (!allowReplace && exists) {
+                            callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
+                            return null;
+                        }
+
+                        if (connConfig == null && !exists) {
+                            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
+                            return null;
+                        }
+
+                        log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
+                        configStorage.putConnectorConfig(connName, connConfig);
+
+                        boolean created = !exists && connConfig != null;
+                        // Note that we use the updated connector config despite the fact that we don't have an updated
+                        // snapshot yet. The existing task info should still be accurate.
+                        ConnectorInfo info = connConfig == null ? null :
+                                new ConnectorInfo(connName, connConfig, configState.tasks(connName));
+                        callback.onCompletion(null, new Created<>(created, info));
 
-                        if (error != null)
-                            callback.onCompletion(error, null);
-                        else
-                            callback.onCompletion(null, connName);
+                        return null;
                     }
                 }));
         member.wakeup();
     }
 
     @Override
-    public synchronized void deleteConnector(final String connName, final Callback<Void> callback) {
-        log.debug("Submitting connector config deletion {}", connName);
+    public synchronized void requestTaskReconfiguration(final String connName) {
+        log.trace("Submitting connector task reconfiguration request {}", connName);
 
         requests.add(new HerderRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
-                        if (!isLeader())
-                            throw new NotLeaderException("Only the leader can delete connectors.");
-
-                        log.debug("Submitting null connector config {}", connName);
-                        configStorage.putConnectorConfig(connName, null);
+                        reconfigureConnector(connName);
                         return null;
                     }
-                },
-                new Callback<Void>() {
+                }
+        ));
+        member.wakeup();
+    }
+
+    @Override
+    public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
+        log.trace("Submitting get task configuration request {}", connName);
+
+        requests.add(new HerderRequest(
+                new Callable<Void>() {
                     @Override
-                    public void onCompletion(Throwable error, Void result) {
-                        if (callback != null)
-                            callback.onCompletion(error, null);
+                    public Void call() throws Exception {
+                        if (!checkConfigSynced(callback))
+                            return null;
+
+                        if (!configState.connectors().contains(connName)) {
+                            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
+                        } else {
+                            List<TaskInfo> result = new ArrayList<>();
+                            for (int i = 0; i < configState.taskCount(connName); i++) {
+                                ConnectorTaskId id = new ConnectorTaskId(connName, i);
+                                result.add(new TaskInfo(id, configState.taskConfig(id)));
+                            }
+                            callback.onCompletion(null, result);
+                        }
+                        return null;
                     }
                 }
         ));
@@ -342,12 +448,21 @@ public class DistributedHerder implements Herder, Runnable {
     }
 
     @Override
-    public synchronized void requestTaskReconfiguration(final String connName) {
+    public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) {
+        log.trace("Submitting put task configuration request {}", connName);
+
         requests.add(new HerderRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
-                        reconfigureConnector(connName);
+                        if (!isLeader())
+                            callback.onCompletion(new NotLeaderException("Only the leader may write task configurations.", leaderUrl()), null);
+                        else if (!configState.connectors().contains(connName))
+                            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
+                        else {
+                            configStorage.putTaskConfigs(taskConfigListAsMap(connName, configs));
+                            callback.onCompletion(null, null);
+                        }
                         return null;
                     }
                 }
@@ -356,11 +471,21 @@ public class DistributedHerder implements Herder, Runnable {
     }
 
 
+    // Should only be called from work thread, so synchronization should not be needed
     private boolean isLeader() {
         return assignment != null && member.memberId().equals(assignment.leader());
     }
 
     /**
+     * Get the URL for the leader's REST interface, or null if we do not have the leader's URL yet.
+     */
+    private String leaderUrl() {
+        if (assignment == null)
+            return null;
+        return assignment.leaderUrl();
+    }
+
+    /**
      * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting
      * this node into sync and its work started. Since
      *
@@ -370,8 +495,6 @@ public class DistributedHerder implements Herder, Runnable {
         if (this.rebalanceResolved)
             return true;
 
-        rebalanceResolved = true;
-
         // We need to handle a variety of cases after a rebalance:
         // 1. Assignment failed
         //  1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before
@@ -430,6 +553,10 @@ public class DistributedHerder implements Herder, Runnable {
 
         startWork();
 
+        // We only mark this as resolved once we've actually started work, which allows us to correctly track whether
+        // what work is currently active and running. If we bail early, the main tick loop + having requested rejoin
+        // guarantees we'll attempt to rejoin before executing this method again.
+        rebalanceResolved = true;
         return true;
     }
 
@@ -483,6 +610,7 @@ public class DistributedHerder implements Herder, Runnable {
                         "configuration. This task will not execute until reconfigured.", e);
             }
         }
+        log.info("Finished starting connectors and tasks");
     }
 
     // Helper for starting a connector with the given name, which will extract & parse the config, generate connector
@@ -511,27 +639,49 @@ public class DistributedHerder implements Herder, Runnable {
         if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
             sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
 
-        Map<ConnectorTaskId, Map<String, String>> taskProps
-                = worker.reconfigureConnectorTasks(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
+        List<Map<String, String>> taskProps
+                = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
         boolean changed = false;
         int currentNumTasks = configState.taskCount(connName);
         if (taskProps.size() != currentNumTasks) {
             log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
             changed = true;
         } else {
-            for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfig : taskProps.entrySet()) {
-                if (!taskConfig.getValue().equals(configState.taskConfig(taskConfig.getKey()))) {
+            int index = 0;
+            for (Map<String, String> taskConfig : taskProps) {
+                if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) {
                     log.debug("Change in task configurations, writing updated task configurations");
                     changed = true;
                     break;
                 }
+                index++;
             }
         }
         if (changed) {
-            // FIXME: Configs should only be written by the leader to avoid conflicts due to zombies. However, until the
-            // REST API is available to forward this request, we need to do this on the worker that generates the config
-            configStorage.putTaskConfigs(taskProps);
+            if (isLeader()) {
+                configStorage.putTaskConfigs(taskConfigListAsMap(connName, taskProps));
+            } else {
+                try {
+                    String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
+                    RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
+                } catch (CopycatException e) {
+                    log.error("Request to leader to reconfigure connector tasks failed", e);
+                }
+            }
+        }
+    }
+
+    // Common handling for requests that get config data. Checks if we are in sync with the current config, which allows
+    // us to answer requests directly. If we are not, handles invoking the callback with the appropriate error.
+    private boolean checkConfigSynced(Callback<?> callback) {
+        if (assignment == null || configState.offset() != assignment.offset()) {
+            if (!isLeader())
+                callback.onCompletion(new NotLeaderException("Cannot get config data because config is not in sync and this is not the leader", leaderUrl()), null);
+            else
+                callback.onCompletion(new CopycatException("Cannot get config data because this is the leader node, but it does not have the most up to date configs"), null);
+            return false;
         }
+        return true;
     }
 
 
@@ -572,7 +722,7 @@ public class DistributedHerder implements Herder, Runnable {
         return new Callback<String>() {
             @Override
             public void onCompletion(Throwable error, String connector) {
-                log.debug("Connector {} config updated", connector);
+                log.info("Connector {} config updated", connector);
                 // Stage the update and wake up the work thread. Connector config *changes* only need the one connector
                 // to be bounced. However, this callback may also indicate a connector *addition*, which does require
                 // a rebalance, so we need to be careful about what operation we request.
@@ -588,7 +738,7 @@ public class DistributedHerder implements Herder, Runnable {
         return new Callback<List<ConnectorTaskId>>() {
             @Override
             public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) {
-                log.debug("Tasks {} configs updated", tasks);
+                log.info("Tasks {} configs updated", tasks);
                 // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs
                 // always need a rebalance to ensure offsets get committed.
                 // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task
@@ -612,8 +762,10 @@ public class DistributedHerder implements Herder, Runnable {
                 // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the
                 // assigned tasks).
                 log.info("Joined group and got assignment: {}", assignment);
-                DistributedHerder.this.assignment = assignment;
-                rebalanceResolved = false;
+                synchronized (DistributedHerder.this) {
+                    DistributedHerder.this.assignment = assignment;
+                    rebalanceResolved = false;
+                }
                 // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
                 // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
                 // main thread.
@@ -627,21 +779,40 @@ public class DistributedHerder implements Herder, Runnable {
                 // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
                 // it is still important to have a leader that can write configs, offsets, etc.
 
-                // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
-                // them to finish
-                // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
-                // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
-                // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
-                // unnecessary repeated connections to the source/sink system.
-                for (String connectorName : connectors)
-                    worker.stopConnector(connectorName);
-                // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
-                // stopping them then state could continue to be reused when the task remains on this worker. For example,
-                // this would avoid having to close a connection and then reopen it when the task is assigned back to this
-                // worker again.
-                for (ConnectorTaskId taskId : tasks)
-                    worker.stopTask(taskId);
+                if (rebalanceResolved) {
+                    // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
+                    // them to finish
+                    // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
+                    // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
+                    // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
+                    // unnecessary repeated connections to the source/sink system.
+                    for (String connectorName : connectors)
+                        worker.stopConnector(connectorName);
+                    // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
+                    // stopping them then state could continue to be reused when the task remains on this worker. For example,
+                    // this would avoid having to close a connection and then reopen it when the task is assigned back to this
+                    // worker again.
+                    for (ConnectorTaskId taskId : tasks)
+                        worker.stopTask(taskId);
+
+                    log.info("Finished stopping tasks in preparation for rebalance");
+                } else {
+                    log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
+                }
+
             }
         };
     }
+
+
+    private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connName, List<Map<String, String>> configs) {
+        int index = 0;
+        Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
+        for (Map<String, String> taskConfigMap : configs) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
+            result.put(taskId, taskConfigMap);
+            index++;
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
deleted file mode 100644
index 86c4d1e..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
-
-import java.util.Map;
-
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
-public class DistributedHerderConfig extends AbstractConfig {
-    private static final ConfigDef CONFIG;
-
-    /*
-     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
-     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
-     */
-
-    /**
-     * <code>group.id</code>
-     */
-    public static final String GROUP_ID_CONFIG = "group.id";
-    private static final String GROUP_ID_DOC = "A unique string that identifies the Copycat cluster group this worker belongs to.";
-
-    /**
-     * <code>session.timeout.ms</code>
-     */
-    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
-    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
-
-    /**
-     * <code>heartbeat.interval.ms</code>
-     */
-    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
-    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
-
-    /**
-     * <code>worker.sync.timeout.ms</code>
-     */
-    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = "worker.sync.timeout.ms";
-    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker is out of sync with other workers and needs" +
-            " to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and" +
-            " waiting a backoff period before rejoining.";
-
-    /**
-     * <code>group.unsync.timeout.ms</code>
-     */
-    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = "worker.unsync.backoff.ms";
-    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the worker is out of sync with other workers and " +
-            " fails to catch up within worker.sync.timeout.ms, leave the Copycat cluster for this long before rejoining.";
-    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
-
-    static {
-        CONFIG = new ConfigDef()
-                .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
-                        ConfigDef.Type.LIST,
-                        ConfigDef.Importance.HIGH,
-                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
-                .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
-                .define(SESSION_TIMEOUT_MS_CONFIG,
-                        ConfigDef.Type.INT,
-                        30000,
-                        ConfigDef.Importance.HIGH,
-                        SESSION_TIMEOUT_MS_DOC)
-                .define(HEARTBEAT_INTERVAL_MS_CONFIG,
-                        ConfigDef.Type.INT,
-                        3000,
-                        ConfigDef.Importance.HIGH,
-                        HEARTBEAT_INTERVAL_MS_DOC)
-                .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
-                        ConfigDef.Type.LONG,
-                        5 * 60 * 1000,
-                        atLeast(0),
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
-                .define(CommonClientConfigs.CLIENT_ID_CONFIG,
-                        ConfigDef.Type.STRING,
-                        "",
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.CLIENT_ID_DOC)
-                .define(CommonClientConfigs.SEND_BUFFER_CONFIG,
-                        ConfigDef.Type.INT,
-                        128 * 1024,
-                        atLeast(0),
-                        ConfigDef.Importance.MEDIUM,
-                        CommonClientConfigs.SEND_BUFFER_DOC)
-                .define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG,
-                        ConfigDef.Type.INT,
-                        32 * 1024,
-                        atLeast(0),
-                        ConfigDef.Importance.MEDIUM,
-                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
-                .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        50L,
-                        atLeast(0L),
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
-                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        100L,
-                        atLeast(0L),
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
-                .define(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        30000,
-                        atLeast(0),
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                .define(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
-                        ConfigDef.Type.INT,
-                        2,
-                        atLeast(1),
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
-                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
-                        ConfigDef.Type.LIST,
-                        "",
-                        ConfigDef.Importance.LOW,
-                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
-                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
-                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
-                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
-                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
-                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
-                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
-                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
-                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
-                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
-                .define(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
-                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
-                        ConfigDef.Type.INT,
-                        40 * 1000,
-                        atLeast(0),
-                        ConfigDef.Importance.MEDIUM,
-                        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
-                        /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
-                .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
-                        ConfigDef.Type.LONG,
-                        9 * 60 * 1000,
-                        ConfigDef.Importance.MEDIUM,
-                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
-                .define(WORKER_SYNC_TIMEOUT_MS_CONFIG,
-                        ConfigDef.Type.INT,
-                        3000,
-                        ConfigDef.Importance.MEDIUM,
-                        WORKER_SYNC_TIMEOUT_MS_DOC)
-                .define(WORKER_UNSYNC_BACKOFF_MS_CONFIG,
-                        ConfigDef.Type.INT,
-                        WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
-                        ConfigDef.Importance.MEDIUM,
-                        WORKER_UNSYNC_BACKOFF_MS_DOC);
-    }
-
-    DistributedHerderConfig(Map<?, ?> props) {
-        super(CONFIG, props);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
index ce8fba5..7e6dc67 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
@@ -24,15 +24,24 @@ import org.apache.kafka.copycat.errors.CopycatException;
  * the leader.
  */
 public class NotLeaderException extends CopycatException {
-    public NotLeaderException(String s) {
-        super(s);
+    private final String leaderUrl;
+
+    public NotLeaderException(String msg, String leaderUrl) {
+        super(msg);
+        this.leaderUrl = leaderUrl;
     }
 
-    public NotLeaderException(String s, Throwable throwable) {
-        super(s, throwable);
+    public NotLeaderException(String msg, String leaderUrl, Throwable throwable) {
+        super(msg, throwable);
+        this.leaderUrl = leaderUrl;
     }
 
-    public NotLeaderException(Throwable throwable) {
+    public NotLeaderException(String leaderUrl, Throwable throwable) {
         super(throwable);
+        this.leaderUrl = leaderUrl;
+    }
+
+    public String leaderUrl() {
+        return leaderUrl;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
index 2fef37c..9fdbac7 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -48,6 +48,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     // Currently Copycat doesn't support multiple task assignment strategies, so we currently just fill in a default value
     public static final String DEFAULT_SUBPROTOCOL = "default";
 
+    private final String restUrl;
     private final KafkaConfigStorage configStorage;
     private CopycatProtocol.Assignment assignmentSnapshot;
     private final CopycatWorkerCoordinatorMetrics sensors;
@@ -69,6 +70,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                              Time time,
                              long requestTimeoutMs,
                              long retryBackoffMs,
+                             String restUrl,
                              KafkaConfigStorage configStorage,
                              WorkerRebalanceListener listener) {
         super(client,
@@ -81,6 +83,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                 time,
                 requestTimeoutMs,
                 retryBackoffMs);
+        this.restUrl = restUrl;
         this.configStorage = configStorage;
         this.assignmentSnapshot = null;
         this.sensors = new CopycatWorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
@@ -101,8 +104,8 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     public LinkedHashMap<String, ByteBuffer> metadata() {
         LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
         configSnapshot = configStorage.snapshot();
-        CopycatProtocol.ConfigState configState = new CopycatProtocol.ConfigState(configSnapshot.offset());
-        metadata.put(DEFAULT_SUBPROTOCOL, CopycatProtocol.serializeMetadata(configState));
+        CopycatProtocol.WorkerState workerState = new CopycatProtocol.WorkerState(restUrl, configSnapshot.offset());
+        metadata.put(DEFAULT_SUBPROTOCOL, CopycatProtocol.serializeMetadata(workerState));
         return metadata;
     }
 
@@ -121,7 +124,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
         log.debug("Performing task assignment");
 
-        Map<String, CopycatProtocol.ConfigState> allConfigs = new HashMap<>();
+        Map<String, CopycatProtocol.WorkerState> allConfigs = new HashMap<>();
         for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet())
             allConfigs.put(entry.getKey(), CopycatProtocol.deserializeMetadata(entry.getValue()));
 
@@ -129,16 +132,17 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         Long leaderOffset = ensureLeaderConfig(maxOffset);
         if (leaderOffset == null)
             return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.CONFIG_MISMATCH,
-                    leaderId, maxOffset, new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
+                    leaderId, allConfigs.get(leaderId).url(), maxOffset,
+                    new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
         return performTaskAssignment(leaderId, leaderOffset, allConfigs);
     }
 
-    private long findMaxMemberConfigOffset(Map<String, CopycatProtocol.ConfigState> allConfigs) {
+    private long findMaxMemberConfigOffset(Map<String, CopycatProtocol.WorkerState> allConfigs) {
         // The new config offset is the maximum seen by any member. We always perform assignment using this offset,
         // even if some members have fallen behind. The config offset used to generate the assignment is included in
         // the response so members that have fallen behind will not use the assignment until they have caught up.
         Long maxOffset = null;
-        for (Map.Entry<String, CopycatProtocol.ConfigState> stateEntry : allConfigs.entrySet()) {
+        for (Map.Entry<String, CopycatProtocol.WorkerState> stateEntry : allConfigs.entrySet()) {
             long memberRootOffset = stateEntry.getValue().offset();
             if (maxOffset == null)
                 maxOffset = memberRootOffset;
@@ -171,7 +175,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         return maxOffset;
     }
 
-    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, CopycatProtocol.ConfigState> allConfigs) {
+    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, CopycatProtocol.WorkerState> allConfigs) {
         Map<String, List<String>> connectorAssignments = new HashMap<>();
         Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
 
@@ -200,12 +204,13 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         }
 
         return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.NO_ERROR,
-                leaderId, maxOffset, connectorAssignments, taskAssignments);
+                leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);
     }
 
     private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members,
                                                                 short error,
                                                                 String leaderId,
+                                                                String leaderUrl,
                                                                 long maxOffset,
                                                                 Map<String, List<String>> connectorAssignments,
                                                                 Map<String, List<ConnectorTaskId>> taskAssignments) {
@@ -218,7 +223,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
             List<ConnectorTaskId> tasks = taskAssignments.get(member);
             if (tasks == null)
                 tasks = Collections.emptyList();
-            CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(error, leaderId, maxOffset, connectors, tasks);
+            CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks);
             log.debug("Assignment: {} -> {}", member, assignment);
             groupAssignment.put(member, CopycatProtocol.serializeAssignment(assignment));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
index 03960cf..908fe59 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
@@ -68,7 +68,7 @@ public class WorkerGroupMember {
 
     private boolean stopped = false;
 
-    public WorkerGroupMember(DistributedHerderConfig config, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+    public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
         try {
             this.time = new SystemTime();
 
@@ -98,15 +98,16 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
             this.coordinator = new WorkerCoordinator(this.client,
-                    config.getString(DistributedHerderConfig.GROUP_ID_CONFIG),
-                    config.getInt(DistributedHerderConfig.SESSION_TIMEOUT_MS_CONFIG),
-                    config.getInt(DistributedHerderConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
+                    config.getString(DistributedConfig.GROUP_ID_CONFIG),
+                    config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     metrics,
                     metricGrpPrefix,
                     metricsTags,
                     this.time,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     retryBackoffMs,
+                    restUrl,
                     configStorage,
                     listener);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java
new file mode 100644
index 0000000..5da747d
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.WorkerConfig;
+import org.apache.kafka.copycat.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.copycat.runtime.rest.errors.CopycatExceptionMapper;
+import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException;
+import org.apache.kafka.copycat.runtime.rest.resources.ConnectorsResource;
+import org.apache.kafka.copycat.runtime.rest.resources.RootResource;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.Slf4jRequestLog;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Embedded server for the REST API that provides the control plane for Copycat workers.
+ */
+public class RestServer {
+    private static final Logger log = LoggerFactory.getLogger(RestServer.class);
+
+    private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000;
+
+    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+    private final WorkerConfig config;
+    private Herder herder;
+    private Server jettyServer;
+
+    /**
+     * Create a REST server for this herder using the specified configs.
+     */
+    public RestServer(WorkerConfig config) {
+        this.config = config;
+
+        // To make the advertised port available immediately, we need to do some configuration here
+        String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
+        Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG);
+
+        jettyServer = new Server();
+
+        ServerConnector connector = new ServerConnector(jettyServer);
+        if (hostname != null && !hostname.isEmpty())
+            connector.setHost(hostname);
+        connector.setPort(port);
+        jettyServer.setConnectors(new Connector[]{connector});
+    }
+
+    public void start(Herder herder) {
+        log.info("Starting REST server");
+
+        this.herder = herder;
+
+        ResourceConfig resourceConfig = new ResourceConfig();
+        resourceConfig.register(new JacksonJsonProvider());
+
+        resourceConfig.register(RootResource.class);
+        resourceConfig.register(new ConnectorsResource(herder));
+
+        resourceConfig.register(CopycatExceptionMapper.class);
+
+        ServletContainer servletContainer = new ServletContainer(resourceConfig);
+        ServletHolder servletHolder = new ServletHolder(servletContainer);
+
+        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        context.setContextPath("/");
+        context.addServlet(servletHolder, "/*");
+
+        RequestLogHandler requestLogHandler = new RequestLogHandler();
+        Slf4jRequestLog requestLog = new Slf4jRequestLog();
+        requestLog.setLoggerName(RestServer.class.getCanonicalName());
+        requestLog.setLogLatency(true);
+        requestLogHandler.setRequestLog(requestLog);
+
+        HandlerCollection handlers = new HandlerCollection();
+        handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});
+
+        /* Needed for graceful shutdown as per `setStopTimeout` documentation */
+        StatisticsHandler statsHandler = new StatisticsHandler();
+        statsHandler.setHandler(handlers);
+        jettyServer.setHandler(statsHandler);
+        jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
+        jettyServer.setStopAtShutdown(true);
+
+        try {
+            jettyServer.start();
+        } catch (Exception e) {
+            throw new CopycatException("Unable to start REST server", e);
+        }
+
+        log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
+    }
+
+    public void stop() {
+        try {
+            jettyServer.stop();
+            jettyServer.join();
+        } catch (Exception e) {
+            throw new CopycatException("Unable to stop REST server", e);
+        } finally {
+            jettyServer.destroy();
+        }
+    }
+
+    /**
+     * Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty
+     * server, unless overrides for advertised hostname and/or port are provided via configs.
+     */
+    public String advertisedUrl() {
+        UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
+        String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
+        if (advertisedHostname != null && !advertisedHostname.isEmpty())
+            builder.host(advertisedHostname);
+        Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
+        if (advertisedPort != null)
+            builder.port(advertisedPort);
+        else
+            builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
+        return builder.build().toString();
+    }
+
+
+    /**
+     * @param url               HTTP connection will be established with this url.
+     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
+     * @param requestBodyData   Object to serialize as JSON and send in the request body.
+     * @param responseFormat    Expected format of the response to the HTTP request.
+     * @param <T>               The type of the deserialized response to the HTTP request.
+     * @return The deserialized response to the HTTP request, or null if no data is expected.
+     */
+    public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
+                                    TypeReference<T> responseFormat) {
+        HttpURLConnection connection = null;
+        try {
+            String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData);
+            log.debug("Sending {} with input {} to {}", method, serializedBody, url);
+
+            connection = (HttpURLConnection) new URL(url).openConnection();
+            connection.setRequestMethod(method);
+
+            connection.setRequestProperty("User-Agent", "kafka-copycat");
+            connection.setRequestProperty("Accept", "application/json");
+
+            // connection.getResponseCode() implicitly calls getInputStream, so always set to true.
+            // On the other hand, leaving this out breaks nothing.
+            connection.setDoInput(true);
+
+            connection.setUseCaches(false);
+
+            if (requestBodyData != null) {
+                connection.setRequestProperty("Content-Type", "application/json");
+                connection.setDoOutput(true);
+
+                OutputStream os = connection.getOutputStream();
+                os.write(serializedBody.getBytes());
+                os.flush();
+                os.close();
+            }
+
+            int responseCode = connection.getResponseCode();
+            if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
+                return new HttpResponse<>(responseCode, connection.getHeaderFields(), null);
+            } else if (responseCode >= 400) {
+                InputStream es = connection.getErrorStream();
+                ErrorMessage errorMessage = JSON_SERDE.readValue(es, ErrorMessage.class);
+                es.close();
+                throw new CopycatRestException(responseCode, errorMessage.errorCode(), errorMessage.message());
+            } else if (responseCode >= 200 && responseCode < 300) {
+                InputStream is = connection.getInputStream();
+                T result = JSON_SERDE.readValue(is, responseFormat);
+                is.close();
+                return new HttpResponse<>(responseCode, connection.getHeaderFields(), result);
+            } else {
+                throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR,
+                        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+                        "Unexpected status code when handling forwarded request: " + responseCode);
+            }
+        } catch (IOException e) {
+            log.error("IO error forwarding REST request: ", e);
+            throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
+        } finally {
+            if (connection != null)
+                connection.disconnect();
+        }
+    }
+
+    public static class HttpResponse<T> {
+        private int status;
+        private Map<String, List<String>> headers;
+        private T body;
+
+        public HttpResponse(int status, Map<String, List<String>> headers, T body) {
+            this.status = status;
+            this.headers = headers;
+            this.body = body;
+        }
+
+        public int status() {
+            return status;
+        }
+
+        public Map<String, List<String>> headers() {
+            return headers;
+        }
+
+        public T body() {
+            return body;
+        }
+    }
+
+    public static String urlJoin(String base, String path) {
+        if (base.endsWith("/") && path.startsWith("/"))
+            return base + path.substring(1);
+        else
+            return base + path;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java
new file mode 100644
index 0000000..2b047d3
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ConnectorInfo {
+
+    private final String name;
+    private final Map<String, String> config;
+    private final List<ConnectorTaskId> tasks;
+
+    @JsonCreator
+    public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config,
+                         @JsonProperty("tasks") List<ConnectorTaskId> tasks) {
+        this.name = name;
+        this.config = config;
+        this.tasks = tasks;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public Map<String, String> config() {
+        return config;
+    }
+
+    @JsonProperty
+    public List<ConnectorTaskId> tasks() {
+        return tasks;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConnectorInfo that = (ConnectorInfo) o;
+        return Objects.equals(name, that.name) &&
+                Objects.equals(config, that.config) &&
+                Objects.equals(tasks, that.tasks);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, config, tasks);
+    }
+
+
+    private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.copycat.util.ConnectorTaskId> tasks) {
+        List<ConnectorTaskId> jsonTasks = new ArrayList<>();
+        for (ConnectorTaskId task : tasks)
+            jsonTasks.add(task);
+        return jsonTasks;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java
new file mode 100644
index 0000000..02ff08b
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/CreateConnectorRequest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateConnectorRequest {
+    private final String name;
+    private final Map<String, String> config;
+
+    @JsonCreator
+    public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config) {
+        this.name = name;
+        this.config = config;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public Map<String, String> config() {
+        return config;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CreateConnectorRequest that = (CreateConnectorRequest) o;
+        return Objects.equals(name, that.name) &&
+                Objects.equals(config, that.config);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java
new file mode 100644
index 0000000..6cbc140
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ErrorMessage.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Standard error format for all REST API failures. These are generated automatically by
+ * {@link org.apache.kafka.copycat.runtime.rest.errors.CopycatExceptionMapper} in response to uncaught
+ * {@link org.apache.kafka.copycat.errors.CopycatException}s.
+ */
+public class ErrorMessage {
+    private final int errorCode;
+    private final String message;
+
+    @JsonCreator
+    public ErrorMessage(@JsonProperty("error_code") int errorCode, @JsonProperty("message") String message) {
+        this.errorCode = errorCode;
+        this.message = message;
+    }
+
+    @JsonProperty("error_code")
+    public int errorCode() {
+        return errorCode;
+    }
+
+    @JsonProperty
+    public String message() {
+        return message;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ErrorMessage that = (ErrorMessage) o;
+        return Objects.equals(errorCode, that.errorCode) &&
+                Objects.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(errorCode, message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java
new file mode 100644
index 0000000..5393163
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ServerInfo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.AppInfoParser;
+
+public class ServerInfo {
+    private String version;
+    private String commit;
+
+    public ServerInfo() {
+        version = AppInfoParser.getVersion();
+        commit = AppInfoParser.getCommitId();
+    }
+
+    @JsonProperty
+    public String version() {
+        return version;
+    }
+
+    @JsonProperty
+    public String commit() {
+        return commit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java
new file mode 100644
index 0000000..9206ea0
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/TaskInfo.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskInfo {
+    private final ConnectorTaskId id;
+    private final Map<String, String> config;
+
+    public TaskInfo(ConnectorTaskId id, Map<String, String> config) {
+        this.id = id;
+        this.config = config;
+    }
+
+    @JsonProperty
+    public ConnectorTaskId id() {
+        return id;
+    }
+
+    @JsonProperty
+    public Map<String, String> config() {
+        return config;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TaskInfo taskInfo = (TaskInfo) o;
+        return Objects.equals(id, taskInfo.id) &&
+                Objects.equals(config, taskInfo.config);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java
new file mode 100644
index 0000000..760200c
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatExceptionMapper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.errors;
+
+import org.apache.kafka.copycat.errors.AlreadyExistsException;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.NotFoundException;
+import org.apache.kafka.copycat.runtime.rest.entities.ErrorMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+
+public class CopycatExceptionMapper implements ExceptionMapper<CopycatException> {
+    private static final Logger log = LoggerFactory.getLogger(CopycatExceptionMapper.class);
+
+    @Override
+    public Response toResponse(CopycatException exception) {
+        log.debug("Uncaught exception in REST call: ", exception);
+
+        if (exception instanceof CopycatRestException) {
+            CopycatRestException restException = (CopycatRestException) exception;
+            return Response.status(restException.statusCode())
+                    .entity(new ErrorMessage(restException.errorCode(), restException.getMessage()))
+                    .build();
+        }
+
+        if (exception instanceof NotFoundException) {
+            return Response.status(Response.Status.NOT_FOUND)
+                    .entity(new ErrorMessage(Response.Status.NOT_FOUND.getStatusCode(), exception.getMessage()))
+                    .build();
+        }
+
+        if (exception instanceof AlreadyExistsException) {
+            return Response.status(Response.Status.CONFLICT)
+                    .entity(new ErrorMessage(Response.Status.CONFLICT.getStatusCode(), exception.getMessage()))
+                    .build();
+        }
+
+        return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                .entity(new ErrorMessage(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.getMessage()))
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java
new file mode 100644
index 0000000..efcf69d
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/errors/CopycatRestException.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.rest.errors;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+
+import javax.ws.rs.core.Response;
+
+public class CopycatRestException extends CopycatException {
+    private final int statusCode;
+    private final int errorCode;
+
+    public CopycatRestException(int statusCode, int errorCode, String message, Throwable t) {
+        super(message, t);
+        this.statusCode = statusCode;
+        this.errorCode = errorCode;
+    }
+
+    public CopycatRestException(Response.Status status, int errorCode, String message, Throwable t) {
+        this(status.getStatusCode(), errorCode, message, t);
+    }
+
+    public CopycatRestException(int statusCode, int errorCode, String message) {
+        this(statusCode, errorCode, message, null);
+    }
+
+    public CopycatRestException(Response.Status status, int errorCode, String message) {
+        this(status, errorCode, message, null);
+    }
+
+    public CopycatRestException(int statusCode, String message, Throwable t) {
+        this(statusCode, statusCode, message, t);
+    }
+
+    public CopycatRestException(Response.Status status, String message, Throwable t) {
+        this(status, status.getStatusCode(), message, t);
+    }
+
+    public CopycatRestException(int statusCode, String message) {
+        this(statusCode, statusCode, message, null);
+    }
+
+    public CopycatRestException(Response.Status status, String message) {
+        this(status.getStatusCode(), status.getStatusCode(), message, null);
+    }
+
+
+    public int statusCode() {
+        return statusCode;
+    }
+
+    public int errorCode() {
+        return errorCode;
+    }
+}


Mime
View raw message