kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3829: Ensure valid configuration prior to creating connector
Date Sun, 13 Nov 2016 00:13:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4f1d8c685 -> b7d36b726


KAFKA-3829: Ensure valid configuration prior to creating connector

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Shikhar Bhushan <shikhar@schmizz.net>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1911 from hachikuji/KAFKA-3829


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7d36b72
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7d36b72
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7d36b72

Branch: refs/heads/trunk
Commit: b7d36b7261ddefea68667a65cc5e2ee0734ed4a1
Parents: 4f1d8c6
Author: Jason Gustafson <jason@confluent.io>
Authored: Sat Nov 12 16:11:04 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Sat Nov 12 16:11:28 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/config/ConfigDef.java   |  10 +-
 .../apache/kafka/common/config/ConfigValue.java |   4 +-
 .../kafka/connect/runtime/AbstractHerder.java   |  43 +++--
 .../apache/kafka/connect/runtime/Herder.java    |  12 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   |   3 +-
 .../runtime/distributed/DistributedConfig.java  |   5 +-
 .../runtime/distributed/DistributedHerder.java  |  72 ++++++--
 .../runtime/rest/entities/ErrorMessage.java     |   4 +-
 .../rest/errors/BadRequestException.java        |  27 +++
 .../resources/ConnectorPluginsResource.java     |   8 +-
 .../rest/resources/ConnectorsResource.java      |   2 +-
 .../runtime/standalone/StandaloneHerder.java    |  58 ++++---
 .../apache/kafka/connect/util/SinkUtils.java    |  27 +++
 .../distributed/DistributedHerderTest.java      | 168 ++++++++++++++++++-
 .../resources/ConnectorPluginsResourceTest.java |   2 +-
 .../rest/resources/ConnectorsResourceTest.java  |   6 +-
 .../standalone/StandaloneHerderTest.java        | 141 +++++++++++++---
 17 files changed, 498 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 08c3617..33f60a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -438,6 +438,10 @@ public class ConfigDef {
      * the current configuration values.
      */
     public List<ConfigValue> validate(Map<String, String> props) {
+        return new ArrayList<>(validateAll(props).values());
+    }
+
+    public Map<String, ConfigValue> validateAll(Map<String, String> props) {
         Map<String, ConfigValue> configValues = new HashMap<>();
         for (String name: configKeys.keySet()) {
             configValues.put(name, new ConfigValue(name));
@@ -466,12 +470,12 @@ public class ConfigDef {
     }
 
 
-    private List<ConfigValue> validate(Map<String, Object> parsed, Map<String, ConfigValue> configValues) {
+    private Map<String, ConfigValue> validate(Map<String, Object> parsed, Map<String, ConfigValue> configValues) {
         Set<String> configsWithNoParent = getConfigsWithNoParent();
         for (String name: configsWithNoParent) {
             validate(name, parsed, configValues);
         }
-        return new LinkedList<>(configValues.values());
+        return configValues;
     }
 
     private List<String> undefinedDependentConfigs() {
@@ -485,7 +489,7 @@ public class ConfigDef {
                 }
             }
         }
-        return new LinkedList<>(undefinedConfigKeys);
+        return new ArrayList<>(undefinedConfigKeys);
     }
 
     private Set<String> getConfigsWithNoParent() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
index c9a4a34..985e05f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.common.config;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -30,7 +30,7 @@ public class ConfigValue {
     private boolean visible;
 
     public ConfigValue(String name) {
-        this(name, null, new LinkedList<Object>(), new LinkedList<String>());
+        this(name, null, new ArrayList<>(), new ArrayList<String>());
     }
 
     public ConfigValue(String name, Object value, List<Object> recommendedValues, List<String> errorMessages) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index b7a0e67..ba4894b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -237,9 +238,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
                 status.workerId(), status.trace());
     }
 
+    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector,
+                                                                    ConfigDef configDef,
+                                                                    Map<String, String> config) {
+        return configDef.validateAll(config);
+    }
 
     @Override
-    public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) {
+    public ConfigInfos validateConnectorConfig(Map<String, String> connectorConfig) {
+        String connType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        if (connType == null)
+            throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
+
         Connector connector = getConnector(connType);
         ConfigDef connectorConfigDef;
         if (connector instanceof SourceConnector) {
@@ -247,22 +257,25 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         } else {
             connectorConfigDef = SinkConnectorConfig.configDef();
         }
-        List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
-        
-        Config config = connector.validate(connectorConfig);
-        ConfigDef configDef = connector.config();
-        Map<String, ConfigKey> configKeys = configDef.configKeys();
-        List<ConfigValue> configValues = config.configValues();
 
-        Map<String, ConfigKey> resultConfigKeys = new HashMap<>(configKeys);
-        resultConfigKeys.putAll(connectorConfigDef.configKeys());
-        configValues.addAll(connectorConfigValues);
+        List<ConfigValue> configValues = new ArrayList<>();
+        Map<String, ConfigKey> configKeys = new HashMap<>();
+        List<String> allGroups = new ArrayList<>();
 
-        List<String> allGroups = new LinkedList<>(connectorConfigDef.groups());
-        List<String> groups = configDef.groups();
-        allGroups.addAll(groups);
+        // do basic connector validation (name, connector type, etc.)
+        Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(connector, connectorConfigDef, connectorConfig);
+        configValues.addAll(validatedConnectorConfig.values());
+        configKeys.putAll(connectorConfigDef.configKeys());
+        allGroups.addAll(connectorConfigDef.groups());
+
+        // do custom connector-specific validation
+        Config config = connector.validate(connectorConfig);
+        ConfigDef configDef = connector.config();
+        configKeys.putAll(configDef.configKeys());
+        allGroups.addAll(configDef.groups());
+        configValues.addAll(config.configValues());
 
-        return generateResult(connType, resultConfigKeys, configValues, allGroups);
+        return generateResult(connType, configKeys, configValues, allGroups);
     }
 
     public static List<ConnectorPluginInfo> connectorPlugins() {
@@ -357,7 +370,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         return new ConfigValueInfo(configValue.name(), value, recommendedValues, configValue.errorMessages(), configValue.visible());
     }
 
-    private Connector getConnector(String connType) {
+    protected Connector getConnector(String connType) {
         if (tempConnectors.containsKey(connType)) {
             return tempConnectors.get(connType);
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index ce8bcf9..b86d6cb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -82,7 +82,7 @@ public interface Herder {
     void connectorConfig(String connName, Callback<Map<String, String>> callback);
 
     /**
-     * Set the configuration for a connector. This supports creation, update, and deletion.
+     * Set the configuration for a connector. This supports creation and updating.
      * @param connName name of the connector
      * @param config the connectors configuration, or null if deleting the connector
      * @param allowReplace if true, allow overwriting previous configs; if false, throw AlreadyExistsException if a connector
@@ -92,6 +92,13 @@ public interface Herder {
     void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
 
     /**
+     * Delete a connector and its configuration.
+     * @param connName name of the connector
+     * @param callback callback to invoke when the configuration has been written
+     */
+    void deleteConnectorConfig(String connName, Callback<Created<ConnectorInfo>> callback);
+
+    /**
      * Requests reconfiguration of the task. This should only be triggered by
      * {@link HerderConnectorContext}.
      *
@@ -130,10 +137,9 @@ public interface Herder {
 
     /**
      * Validate the provided connector config values against the configuration definition.
-     * @param connType the connector class
      * @param connectorConfig the provided connector config values
      */
-    ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig);
+    ConfigInfos validateConnectorConfig(Map<String, String> connectorConfig);
 
     /**
      * Restart the task with the given id.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index a48489e..1575581 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -37,6 +37,7 @@ import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -331,7 +332,7 @@ class WorkerSinkTask extends WorkerTask {
         // and through to the task
         Map<String, Object> props = new HashMap<>();
 
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                 Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 1617d59..c7e585c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -105,7 +105,10 @@ public class DistributedConfig extends WorkerConfig {
 
     static {
         CONFIG = baseConfigDef()
-                .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
+                .define(GROUP_ID_CONFIG,
+                        ConfigDef.Type.STRING,
+                        ConfigDef.Importance.HIGH,
+                        GROUP_ID_DOC)
                 .define(SESSION_TIMEOUT_MS_CONFIG,
                         ConfigDef.Type.INT,
                         10000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index ef617a7..170c983 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -17,9 +17,12 @@
 
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -32,12 +35,16 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,6 +101,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     private final Time time;
 
+    private final String workerGroupId;
     private final int workerSyncTimeoutMs;
     private final int workerUnsyncBackoffMs;
 
@@ -143,6 +151,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         super(worker, workerId, statusBackingStore, configStorage);
 
         this.time = time;
+        this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
         this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
         this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
         this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time);
@@ -429,14 +438,64 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
+    public void deleteConnectorConfig(final String connName, final Callback<Created<ConnectorInfo>> callback) {
+        addRequest(
+                new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        log.trace("Handling connector config request {}", connName);
+                        if (!isLeader()) {
+                            callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
+                            return null;
+                        }
+
+                        if (!configState.contains(connName)) {
+                            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
+                        } else {
+                            log.trace("Removing connector config {} {}", connName, configState.connectors());
+                            configBackingStore.removeConnectorConfig(connName);
+                            callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
+                        }
+                        return null;
+                    }
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
+    @Override
+    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector,
+                                                                    ConfigDef configDef,
+                                                                    Map<String, String> config) {
+        Map<String, ConfigValue> validatedConfig = super.validateBasicConnectorConfig(connector, configDef, config);
+        if (connector instanceof SinkConnector) {
+            ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
+            String name = (String) validatedName.value();
+
+            if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
+                validatedName.addErrorMessage("Consumer group for sink connector named " + name +
+                        " conflicts with Connect worker group " + workerGroupId);
+            }
+        }
+        return validatedConfig;
+    }
+
+
+    @Override
     public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
                                    final Callback<Created<ConnectorInfo>> callback) {
         log.trace("Submitting connector config write request {}", connName);
-
         addRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
+                        ConfigInfos validatedConfig = validateConnectorConfig(config);
+                        if (validatedConfig.errorCount() > 0) {
+                            callback.onCompletion(new BadRequestException("Connector configuration is invalid " +
+                                    "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null);
+                            return null;
+                        }
+
                         log.trace("Handling connector config request {}", connName);
                         if (!isLeader()) {
                             callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
@@ -449,17 +508,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                             return null;
                         }
 
-                        if (config == null) {
-                            if (!exists) {
-                                callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
-                            } else {
-                                log.trace("Removing connector config {} {} {}", connName, allowReplace, configState.connectors());
-                                configBackingStore.removeConnectorConfig(connName);
-                                callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
-                            }
-                            return null;
-                        }
-
                         log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
                         configBackingStore.putConnectorConfig(connName, config);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
index 493b00d..e9381d9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
@@ -24,8 +24,8 @@ import java.util.Objects;
 
 /**
  * Standard error format for all REST API failures. These are generated automatically by
- * {@link ConnectExceptionMapper} in response to uncaught
- * {@link ConnectException}s.
+ * {@link org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper} in response to uncaught
+ * {@link org.apache.kafka.connect.errors.ConnectException}s.
  */
 public class ErrorMessage {
     private final int errorCode;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java
new file mode 100644
index 0000000..5fcb226
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.rest.errors;
+
+import javax.ws.rs.core.Response;
+
+public class BadRequestException extends ConnectRestException {
+
+    public BadRequestException(String message) {
+        super(Response.Status.BAD_REQUEST, message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 9e87d0c..519aa9a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import org.apache.kafka.connect.runtime.AbstractHerder;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
@@ -25,6 +26,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import java.util.List;
 import java.util.Map;
 
+import javax.ws.rs.BadRequestException;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
 import javax.ws.rs.PUT;
@@ -48,7 +50,11 @@ public class ConnectorPluginsResource {
     @Path("/{connectorType}/config/validate")
     public ConfigInfos validateConfigs(final @PathParam("connectorType") String connType,
                                        final Map<String, String> connectorConfig) throws Throwable {
-        return herder.validateConfigs(connType, connectorConfig);
+        String includedConnType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        if (includedConnType != null && !includedConnType.equals(connType))
+            throw new BadRequestException("Included connector type " + includedConnType + " does not match request type " + connType);
+
+        return herder.validateConnectorConfig(connectorConfig);
     }
 
     @GET

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 39e9a0f..e8f134d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -218,7 +218,7 @@ public class ConnectorsResource {
     public void destroyConnector(final @PathParam("connector") String connector,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.putConnectorConfig(connector, null, true, cb);
+        herder.deleteConnectorConfig(connector, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 2015f27..ff09c1c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -28,8 +28,10 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
 import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
@@ -129,47 +131,59 @@ public class StandaloneHerder extends AbstractHerder {
     }
 
     @Override
+    public synchronized void deleteConnectorConfig(String connName, Callback<Created<ConnectorInfo>> callback) {
+        try {
+            if (!configState.contains(connName)) {
+                // Deletion, must already exist
+                callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
+                return;
+            }
+
+            removeConnectorTasks(connName);
+            worker.stopConnector(connName);
+            configBackingStore.removeConnectorConfig(connName);
+            onDeletion(connName);
+            callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
+        } catch (ConnectException e) {
+            callback.onCompletion(e, null);
+        }
+
+    }
+
+    @Override
     public synchronized void putConnectorConfig(String connName,
                                                 final Map<String, String> config,
                                                 boolean allowReplace,
                                                 final Callback<Created<ConnectorInfo>> callback) {
         try {
+            ConfigInfos validatedConfig = validateConnectorConfig(config);
+            if (validatedConfig.errorCount() > 0) {
+                callback.onCompletion(new BadRequestException("Connector configuration is invalid " +
+                        "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null);
+                return;
+            }
+
             boolean created = false;
             if (configState.contains(connName)) {
                 if (!allowReplace) {
                     callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
                     return;
                 }
-                if (config == null) // Deletion, kill tasks as well
-                    removeConnectorTasks(connName);
                 worker.stopConnector(connName);
-                if (config == null) {
-                    configBackingStore.removeConnectorConfig(connName);
-                    onDeletion(connName);
-                }
             } else {
-                if (config == null) {
-                    // Deletion, must already exist
-                    callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
-                    return;
-                }
                 created = true;
             }
-            if (config != null) {
-                if (!startConnector(config)) {
-                    callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
-                    return;
-                }
-                updateConnectorTasks(connName);
+
+            if (!startConnector(config)) {
+                callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
+                return;
             }
-            if (config != null)
-                callback.onCompletion(null, new Created<>(created, createConnectorInfo(connName)));
-            else
-                callback.onCompletion(null, new Created<ConnectorInfo>(false, null));
+
+            updateConnectorTasks(connName);
+            callback.onCompletion(null, new Created<>(created, createConnectorInfo(connName)));
         } catch (ConnectException e) {
             callback.onCompletion(e, null);
         }
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
new file mode 100644
index 0000000..cb1ef48
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+public final class SinkUtils {
+
+    private SinkUtils() {}
+
+    public static String consumerGroupId(String connector) {
+        return "connect-" + connector;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index e4ebc89..1da4595 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -18,11 +18,16 @@
 package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
@@ -31,6 +36,8 @@ import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
@@ -39,6 +46,7 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Before;
@@ -61,6 +69,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -74,7 +83,7 @@ public class DistributedHerderTest {
         HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
         HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
         HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");
+        HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
         // The WorkerConfig base class has some required settings without defaults
         HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
@@ -293,6 +302,15 @@ public class DistributedHerderTest {
 
         member.wakeup();
         PowerMock.expectLastCall();
+
+        // config validation
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
+        EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+
         // CONN2 is new, should succeed
         configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);
         PowerMock.expectLastCall();
@@ -312,6 +330,133 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testCreateConnectorFailedBasicValidation() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        HashMap<String, String> config = new HashMap<>(CONN2_CONFIG);
+        config.remove(ConnectorConfig.NAME_CONFIG);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // config validation
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
+        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+
+        // CONN2 creation should fail
+
+        Capture<Throwable> error = EasyMock.newCapture();
+        putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via the config log
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONN2, config, false, putConnectorCallback);
+        herder.tick();
+
+        assertTrue(error.hasCaptured());
+        assertTrue(error.getValue() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorFailedCustomValidation() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // config validation
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
+        ConfigDef configDef = new ConfigDef();
+        configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc");
+        EasyMock.expect(connectorMock.config()).andReturn(configDef);
+
+        ConfigValue validatedValue = new ConfigValue("foo.bar");
+        validatedValue.addErrorMessage("Failed foo.bar validation");
+        EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(singletonList(validatedValue)));
+
+        // CONN2 creation should fail
+
+        Capture<Throwable> error = EasyMock.newCapture();
+        putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via the config log
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback);
+        herder.tick();
+
+        assertTrue(error.hasCaptured());
+        assertTrue(error.getValue() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorNameConflictsWithWorkerGroupId() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        Map<String, String> config = new HashMap<>(CONN2_CONFIG);
+        config.put(ConnectorConfig.NAME_CONFIG, "test-group");
+
+        // config validation
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
+        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+
+        // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with
+        // the consumer group id we would use for this sink
+
+        Capture<Throwable> error = EasyMock.newCapture();
+        putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.isNull(Herder.Created.class));
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via the config log
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONN2, config, false, putConnectorCallback);
+        herder.tick();
+
+        assertTrue(error.hasCaptured());
+        assertTrue(error.getValue() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testCreateConnectorAlreadyExists() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
@@ -359,7 +504,7 @@ public class DistributedHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONN1, null, true, putConnectorCallback);
+        herder.deleteConnectorConfig(CONN1, putConnectorCallback);
         herder.tick();
 
         PowerMock.verifyAll();
@@ -371,7 +516,7 @@ public class DistributedHerderTest {
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
-        expectRebalance(1, Collections.singletonList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        expectRebalance(1, singletonList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -519,7 +664,7 @@ public class DistributedHerderTest {
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
-        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
+        expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -827,7 +972,7 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
 
         // join
-        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
+        expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
@@ -864,7 +1009,7 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
 
         // join
-        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
+        expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
@@ -904,7 +1049,7 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
 
         // join
-        expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
+        expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
         worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
@@ -1076,6 +1221,15 @@ public class DistributedHerderTest {
         // Poll loop for second round of calls
         member.ensureActive();
         PowerMock.expectLastCall();
+
+        // config validation
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
+        EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+
         configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index e8ee93d..b974559 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -129,7 +129,7 @@ public class ConnectorPluginsResourceTest {
 
     @Test
     public void testValidateConfig() throws Throwable {
-        herder.validateConfigs(EasyMock.eq(ConnectorPluginsResourceTestConnector.class.getName()), EasyMock.eq(props));
+        herder.validateConnectorConfig(EasyMock.eq(props));
 
         PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 9e97087..1c8bd38 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -214,7 +214,7 @@ public class ConnectorsResourceTest {
     @Test
     public void testDeleteConnector() throws Throwable {
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
         expectAndCallbackResult(cb, null);
 
         PowerMock.replayAll();
@@ -227,7 +227,7 @@ public class ConnectorsResourceTest {
     @Test
     public void testDeleteConnectorNotLeader() throws Throwable {
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
         EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", null, null))
@@ -244,7 +244,7 @@ public class ConnectorsResourceTest {
     @Test(expected = NotFoundException.class)
     public void testDeleteConnectorNotFound() throws Throwable {
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb));
+        herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
         expectAndCallbackException(cb, new NotFoundException("not found"));
 
         PowerMock.replayAll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d36b72/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 971d84f..0bc3d5c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.connect.runtime.standalone;
 
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
@@ -25,6 +28,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
@@ -34,6 +38,7 @@ import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceConnector;
@@ -61,6 +66,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -95,9 +101,64 @@ public class StandaloneHerderTest {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(config);
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorFailedBasicValidation() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        config.remove(ConnectorConfig.NAME_CONFIG);
+
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+
+        createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
+        PowerMock.expectLastCall();
+
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorFailedCustomValidation() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
+        ConfigDef configDef = new ConfigDef();
+        configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar doc");
+        EasyMock.expect(connectorMock.config()).andReturn(configDef);
+
+        ConfigValue validatedValue = new ConfigValue("foo.bar");
+        validatedValue.addErrorMessage("Failed foo.bar validation");
+        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
+
+        createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -108,14 +169,17 @@ public class StandaloneHerderTest {
         // First addition should succeed
         expectAdd(SourceSink.SOURCE);
 
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(config, config);
+
         // Second should fail
         createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -125,9 +189,12 @@ public class StandaloneHerderTest {
         connector = PowerMock.createMock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
+        Map<String, String> config = connectorConfig(SourceSink.SINK);
+        expectConfigValidation(config);
+
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -137,6 +204,9 @@ public class StandaloneHerderTest {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(config);
+
         EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
         statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
 
@@ -144,14 +214,14 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
         FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new FutureCallback<>();
-        herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
+        herder.deleteConnectorConfig(CONNECTOR_NAME, futureCb);
         futureCb.get(1000L, TimeUnit.MILLISECONDS);
 
         // Second deletion should fail since the connector is gone
         futureCb = new FutureCallback<>();
-        herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
+        herder.deleteConnectorConfig(CONNECTOR_NAME, futureCb);
         try {
             futureCb.get(1000L, TimeUnit.MILLISECONDS);
             fail("Should have thrown NotFoundException");
@@ -166,16 +236,19 @@ public class StandaloneHerderTest {
     public void testRestartConnector() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(config);
+
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
 
-        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig(SourceSink.SOURCE)),
+        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(config),
                 EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall().andReturn(true);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -188,16 +261,19 @@ public class StandaloneHerderTest {
     public void testRestartConnectorFailureOnStart() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(config);
+
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
 
-        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig(SourceSink.SOURCE)),
+        worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(config),
                 EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall().andReturn(false);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -216,15 +292,18 @@ public class StandaloneHerderTest {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
         expectAdd(SourceSink.SOURCE);
 
+        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(connectorConfig);
+
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall().andReturn(true);
 
-        worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -238,15 +317,18 @@ public class StandaloneHerderTest {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
         expectAdd(SourceSink.SOURCE);
 
+        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(connectorConfig);
+
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall().andReturn(true);
 
-        worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(false);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -264,6 +346,10 @@ public class StandaloneHerderTest {
     public void testCreateAndStop() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
+
+        Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
+        expectConfigValidation(connectorConfig);
+
         // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
         expectStop();
 
@@ -274,7 +360,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, createCallback);
         herder.stop();
 
         PowerMock.verifyAll();
@@ -299,10 +385,10 @@ public class StandaloneHerderTest {
         taskConfigsCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<List<TaskInfo>>isNull());
         EasyMock.expectLastCall();
 
-
         // Create connector
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
+        expectConfigValidation(connConfig);
 
         // Validate accessors with 1 connector
         listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME));
@@ -347,6 +433,8 @@ public class StandaloneHerderTest {
         // Create
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
+        expectConfigValidation(connConfig, newConnConfig);
+
         // Should get first config
         connectorConfigCb.onCompletion(null, connConfig);
         EasyMock.expectLastCall();
@@ -360,7 +448,7 @@ public class StandaloneHerderTest {
         EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
         // Generate same task config, which should result in no additional action to restart tasks
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
-                .andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
+                .andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
         worker.isSinkConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(false);
         ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
@@ -413,7 +501,7 @@ public class StandaloneHerderTest {
         Map<String, String> generatedTaskProps = taskConfig(sourceSink);
 
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null))
-            .andReturn(Collections.singletonList(generatedTaskProps));
+            .andReturn(singletonList(generatedTaskProps));
 
         worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
@@ -424,7 +512,7 @@ public class StandaloneHerderTest {
 
     private void expectStop() {
         ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        worker.stopAndAwaitTasks(Collections.singletonList(task));
+        worker.stopAndAwaitTasks(singletonList(task));
         EasyMock.expectLastCall().andReturn(Collections.singleton(task));
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(true);
@@ -456,6 +544,19 @@ public class StandaloneHerderTest {
         return generatedTaskProps;
     }
 
+
+    private void expectConfigValidation(Map<String, String> ... configs) {
+        // config validation
+        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
+        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+
+        for (Map<String, String> config : configs)
+            EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+    }
+
     // We need to use a real class here due to some issue with mocking java.lang.Class
     private abstract class BogusSourceConnector extends SourceConnector {
     }


Mime
View raw message