kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6886: Externalize secrets from Connect configs (KIP-297)
Date Wed, 30 May 2018 21:43:46 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08e8fac  KAFKA-6886: Externalize secrets from Connect configs (KIP-297)
08e8fac is described below

commit 08e8facdc9fce3a9195f5f646b49f55ffa043c73
Author: Robert Yokota <rayokota@gmail.com>
AuthorDate: Wed May 30 14:43:11 2018 -0700

    KAFKA-6886: Externalize secrets from Connect configs (KIP-297)
    
    This commit allows secrets in Connect configs to be externalized and replaced with variable references of the form `${provider:[path:]key}`, where the "path" is optional.
    
    There are 2 main additions to `org.apache.kafka.common.config`: a `ConfigProvider` and a `ConfigTransformer`.  The `ConfigProvider` is an interface that allows key-value pairs to be provided by an external source for a given "path".  An a TTL can be associated with the key-value pairs returned from the path.  The `ConfigTransformer` will use instances of `ConfigProvider` to replace variable references in a set of configuration values.
    
    In the Connect framework, `ConfigProvider` classes can be specified in the worker config, and then variable references can be used in the connector config.  In addition, the herder can be configured to restart connectors (or not) based on the TTL returned from a `ConfigProvider`.  The main class that performs restarts and transformations is `WorkerConfigTransformer`.
    
    Finally, a `configs()` method has been added to both `SourceTaskContext` and `SinkTaskContext`.  This allows connectors to get configs with variables replaced by the latest values from instances of `ConfigProvider`.
    
    Most of the other changes in the Connect framework are threading various objects through classes to enable the above functionality.
    
    Author: Robert Yokota <rayokota@gmail.com>
    Author: Ewen Cheslack-Postava <me@ewencp.org>
    
    Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #5068 from rayokota/KAFKA-6886-connect-secrets
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/common/config/ConfigChangeCallback.java  |  17 ++-
 .../org/apache/kafka/common/config/ConfigData.java |  66 ++++++++
 .../apache/kafka/common/config/ConfigProvider.java |  78 ++++++++++
 .../kafka/common/config/ConfigTransformer.java     | 169 +++++++++++++++++++++
 .../common/config/ConfigTransformerResult.java     |  61 ++++++++
 .../kafka/common/config/FileConfigProvider.java    | 101 ++++++++++++
 .../kafka/common/config/ConfigTransformerTest.java | 117 ++++++++++++++
 .../common/config/FileConfigProviderTest.java      |  95 ++++++++++++
 .../apache/kafka/connect/sink/SinkTaskContext.java |  10 ++
 .../kafka/connect/source/SourceTaskContext.java    |  11 ++
 .../kafka/connect/cli/ConnectDistributed.java      |   7 +-
 .../kafka/connect/runtime/AbstractHerder.java      |   1 +
 .../kafka/connect/runtime/ConnectorConfig.java     |  20 ++-
 .../org/apache/kafka/connect/runtime/Herder.java   |  20 +++
 .../kafka/connect/runtime/HerderRequest.java}      |  15 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |  39 ++++-
 .../apache/kafka/connect/runtime/WorkerConfig.java |  10 ++
 .../connect/runtime/WorkerConfigTransformer.java   |  71 +++++++++
 .../kafka/connect/runtime/WorkerSinkTask.java      |   6 +-
 .../connect/runtime/WorkerSinkTaskContext.java     |  12 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    |   6 +-
 .../connect/runtime/WorkerSourceTaskContext.java   |  16 +-
 .../runtime/distributed/ClusterConfigState.java    |  68 ++++++++-
 .../runtime/distributed/DistributedHerder.java     |  48 ++++--
 .../runtime/isolation/DelegatingClassLoader.java   |  10 ++
 .../runtime/isolation/PluginScanResult.java        |   9 ++
 .../connect/runtime/isolation/PluginType.java      |   2 +
 .../kafka/connect/runtime/isolation/Plugins.java   |  44 ++++++
 .../runtime/standalone/StandaloneHerder.java       |  79 +++++++++-
 .../connect/storage/KafkaConfigBackingStore.java   |   9 +-
 .../connect/storage/MemoryConfigBackingStore.java  |  12 +-
 .../kafka/connect/runtime/AbstractHerderTest.java  |  22 +--
 .../connect/runtime/ErrorHandlingTaskTest.java     |   7 +-
 .../runtime/WorkerConfigTransformerTest.java       | 146 ++++++++++++++++++
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |   4 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        |   4 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |   4 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   |  12 +-
 .../runtime/distributed/DistributedHerderTest.java |  28 ++--
 .../runtime/standalone/StandaloneHerderTest.java   |  30 +++-
 .../storage/KafkaConfigBackingStoreTest.java       |   2 +-
 42 files changed, 1394 insertions(+), 96 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba48c38..5bf69b6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -83,7 +83,7 @@
               files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
 
     <suppress checks="ParameterNumber"
-              files="WorkerSourceTask.java"/>
+              files="(WorkerSinkTask|WorkerSourceTask).java"/>
     <suppress checks="ParameterNumber"
               files="WorkerCoordinator.java"/>
     <suppress checks="ParameterNumber"
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java
similarity index 69%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
copy to clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java
index 8eec1df..d4c9948 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.source;
-
-import org.apache.kafka.connect.storage.OffsetStorageReader;
+package org.apache.kafka.common.config;
 
 /**
- * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying
- * runtime.
+ * A callback passed to {@link ConfigProvider} for subscribing to changes.
  */
-public interface SourceTaskContext {
+public interface ConfigChangeCallback {
+
     /**
-     * Get the OffsetStorageReader for this SourceTask.
+     * Performs an action when configuration data changes.
+     *
+     * @param path the path at which the data resides
+     * @param data the configuration data
      */
-    OffsetStorageReader offsetStorageReader();
+    void onChange(String path, ConfigData data);
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java
new file mode 100644
index 0000000..2bd0ff6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.Map;
+
+/**
+ * Configuration data from a {@link ConfigProvider}.
+ */
+public class ConfigData {
+
+    private final Map<String, String> data;
+    private final Long ttl;
+
+    /**
+     * Creates a new ConfigData with the given data and TTL (in milliseconds).
+     *
+     * @param data a Map of key-value pairs
+     * @param ttl the time-to-live of the data in milliseconds, or null if there is no TTL
+     */
+    public ConfigData(Map<String, String> data, Long ttl) {
+        this.data = data;
+        this.ttl = ttl;
+    }
+
+    /**
+     * Creates a new ConfigData with the given data.
+     *
+     * @param data a Map of key-value pairs
+     */
+    public ConfigData(Map<String, String> data) {
+        this(data, null);
+    }
+
+    /**
+     * Returns the data.
+     *
+     * @return data a Map of key-value pairs
+     */
+    public Map<String, String> data() {
+        return data;
+    }
+
+    /**
+     * Returns the TTL (in milliseconds).
+     *
+     * @return ttl the time-to-live (in milliseconds) of the data, or null if there is no TTL
+     */
+    public Long ttl() {
+        return ttl;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java
new file mode 100644
index 0000000..7133baa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import org.apache.kafka.common.Configurable;
+
+import java.io.Closeable;
+import java.util.Set;
+
+/**
+ * A provider of configuration data, which may optionally support subscriptions to configuration changes.
+ */
+public interface ConfigProvider extends Configurable, Closeable {
+
+    /**
+     * Retrieves the data at the given path.
+     *
+     * @param path the path where the data resides
+     * @return the configuration data
+     */
+    ConfigData get(String path);
+
+    /**
+     * Retrieves the data with the given keys at the given path.
+     *
+     * @param path the path where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @return the configuration data
+     */
+    ConfigData get(String path, Set<String> keys);
+
+    /**
+     * Subscribes to changes for the given keys at the given path (optional operation).
+     *
+     * @param path the path where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @param callback the callback to invoke upon change
+     * @throws {@link UnsupportedOperationException} if the subscribe operation is not supported
+     */
+    default void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Unsubscribes to changes for the given keys at the given path (optional operation).
+     *
+     * @param path the path where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @param callback the callback to be unsubscribed from changes
+     * @throws {@link UnsupportedOperationException} if the unsubscribe operation is not supported
+     */
+    default void unsubscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Clears all subscribers (optional operation).
+     *
+     * @throws {@link UnsupportedOperationException} if the unsubscribeAll operation is not supported
+     */
+    default void unsubscribeAll() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
new file mode 100644
index 0000000..7c3c516
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class wraps a set of {@link ConfigProvider} instances and uses them to perform
+ * transformations.
+ *
+ * <p>The default variable pattern is of the form <code>${provider:[path:]key}</code>,
+ * where the <code>provider</code> corresponds to a {@link ConfigProvider} instance, as passed to
+ * {@link ConfigTransformer#ConfigTransformer(Map)}.  The pattern will extract a set
+ * of paths (which are optional) and keys and then pass them to {@link ConfigProvider#get(String, Set)} to obtain the
+ * values with which to replace the variables.
+ *
+ * <p>For example, if a Map consisting of an entry with a provider name "file" and provider instance
+ * {@link FileConfigProvider} is passed to the {@link ConfigTransformer#ConfigTransformer(Map)}, and a Properties
+ * file with contents
+ * <pre>
+ * fileKey=someValue
+ * </pre>
+ * resides at the path "/tmp/properties.txt", then when a configuration Map which has an entry with a key "someKey" and
+ * a value "${file:/tmp/properties.txt:fileKey}" is passed to the {@link #transform(Map)} method, then the transformed
+ * Map will have an entry with key "someKey" and a value "someValue".
+ *
+ * <p>This class only depends on {@link ConfigProvider#get(String, Set)} and does not depend on subscription support
+ * in a {@link ConfigProvider}, such as the {@link ConfigProvider#subscribe(String, Set, ConfigChangeCallback)} and
+ * {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)} methods.
+ */
+public class ConfigTransformer {
+    private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
+    private static final String EMPTY_PATH = "";
+
+    private final Map<String, ConfigProvider> configProviders;
+
+    /**
+     * Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
+     *
+     * @param configProviders a Map of provider names and {@link ConfigProvider} instances.
+     */
+    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
+        this.configProviders = configProviders;
+    }
+
+    /**
+     * Transforms the given configuration data by using the {@link ConfigProvider} instances to
+     * look up values to replace the variables in the pattern.
+     *
+     * @param configs the configuration values to be transformed
+     * @return an instance of {@link ConfigTransformerResult}
+     */
+    public ConfigTransformerResult transform(Map<String, String> configs) {
+        Map<String, Map<String, Set<String>>> keysByProvider = new HashMap<>();
+        Map<String, Map<String, Map<String, String>>> lookupsByProvider = new HashMap<>();
+
+        // Collect the variables from the given configs that need transformation
+        for (Map.Entry<String, String> config : configs.entrySet()) {
+            List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), DEFAULT_PATTERN);
+            for (ConfigVariable var : vars) {
+                Map<String, Set<String>> keysByPath = keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>());
+                Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>());
+                keys.add(var.variable);
+            }
+        }
+
+        // Retrieve requested variables from the ConfigProviders
+        Map<String, Long> ttls = new HashMap<>();
+        for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
+            String providerName = entry.getKey();
+            ConfigProvider provider = configProviders.get(providerName);
+            Map<String, Set<String>> keysByPath = entry.getValue();
+            if (provider != null && keysByPath != null) {
+                for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
+                    String path = pathWithKeys.getKey();
+                    Set<String> keys = new HashSet<>(pathWithKeys.getValue());
+                    ConfigData configData = provider.get(path, keys);
+                    Map<String, String> data = configData.data();
+                    Long ttl = configData.ttl();
+                    if (ttl != null && ttl >= 0) {
+                        ttls.put(path, ttl);
+                    }
+                    Map<String, Map<String, String>> keyValuesByPath =
+                            lookupsByProvider.computeIfAbsent(providerName, k -> new HashMap<>());
+                    keyValuesByPath.put(path, data);
+                }
+            }
+        }
+
+        // Perform the transformations by performing variable replacements
+        Map<String, String> data = new HashMap<>(configs);
+        for (Map.Entry<String, String> config : configs.entrySet()) {
+            data.put(config.getKey(), replace(lookupsByProvider, config.getValue(), DEFAULT_PATTERN));
+        }
+        return new ConfigTransformerResult(data, ttls);
+    }
+
+    private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
+        List<ConfigVariable> configVars = new ArrayList<>();
+        Matcher matcher = pattern.matcher(value);
+        while (matcher.find()) {
+            configVars.add(new ConfigVariable(matcher));
+        }
+        return configVars;
+    }
+
+    private static String replace(Map<String, Map<String, Map<String, String>>> lookupsByProvider,
+                                  String value,
+                                  Pattern pattern) {
+        Matcher matcher = pattern.matcher(value);
+        StringBuilder builder = new StringBuilder();
+        int i = 0;
+        while (matcher.find()) {
+            ConfigVariable configVar = new ConfigVariable(matcher);
+            Map<String, Map<String, String>> lookupsByPath = lookupsByProvider.get(configVar.providerName);
+            if (lookupsByPath != null) {
+                Map<String, String> keyValues = lookupsByPath.get(configVar.path);
+                String replacement = keyValues.get(configVar.variable);
+                builder.append(value, i, matcher.start());
+                if (replacement == null) {
+                    // No replacements will be performed; just return the original value
+                    builder.append(matcher.group(0));
+                } else {
+                    builder.append(replacement);
+                }
+                i = matcher.end();
+            }
+        }
+        builder.append(value, i, value.length());
+        return builder.toString();
+    }
+
+    private static class ConfigVariable {
+        final String providerName;
+        final String path;
+        final String variable;
+
+        ConfigVariable(Matcher matcher) {
+            this.providerName = matcher.group(1);
+            this.path = matcher.group(3) != null ? matcher.group(3) : EMPTY_PATH;
+            this.variable = matcher.group(4);
+        }
+
+        public String toString() {
+            return "(" + providerName + ":" + (path != null ? path + ":" : "") + variable + ")";
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java
new file mode 100644
index 0000000..df7bea6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.Map;
+
+/**
+ * The result of a transformation from {@link ConfigTransformer}.
+ */
+public class ConfigTransformerResult {
+
+    private Map<String, Long> ttls;
+    private Map<String, String> data;
+
+    /**
+     * Creates a new ConfigTransformerResult with the given data and TTL values for a set of paths.
+     *
+     * @param data a Map of key-value pairs
+     * @param ttls a Map of path and TTL values (in milliseconds)
+     */
+    public ConfigTransformerResult(Map<String, String> data, Map<String, Long> ttls) {
+        this.data = data;
+        this.ttls = ttls;
+    }
+
+    /**
+     * Returns the transformed data, with variables replaced with corresponding values from the
+     * ConfigProvider instances if found.
+     *
+     * <p>Modifying the transformed data that is returned does not affect the {@link ConfigProvider} nor the
+     * original data that was used as the source of the transformation.
+     *
+     * @return data a Map of key-value pairs
+     */
+    public Map<String, String> data() {
+        return data;
+    }
+
+    /**
+     * Returns the TTL values (in milliseconds) returned from the ConfigProvider instances for a given set of paths.
+     *
+     * @return data a Map of path and TTL values
+     */
+    public Map<String, Long> ttls() {
+        return ttls;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
new file mode 100644
index 0000000..fefc935
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * An implementation of {@link ConfigProvider} that represents a Properties file.
+ * All property keys and values are stored as cleartext.
+ */
+public class FileConfigProvider implements ConfigProvider {
+
+    public void configure(Map<String, ?> configs) {
+    }
+
+    /**
+     * Retrieves the data at the given Properties file.
+     *
+     * @param path the file where the data resides
+     * @return the configuration data
+     */
+    public ConfigData get(String path) {
+        Map<String, String> data = new HashMap<>();
+        if (path == null || path.isEmpty()) {
+            return new ConfigData(data);
+        }
+        try (Reader reader = reader(path)) {
+            Properties properties = new Properties();
+            properties.load(reader);
+            Enumeration<Object> keys = properties.keys();
+            while (keys.hasMoreElements()) {
+                String key = keys.nextElement().toString();
+                String value = properties.getProperty(key);
+                if (value != null) {
+                    data.put(key, value);
+                }
+            }
+            return new ConfigData(data);
+        } catch (IOException e) {
+            throw new ConfigException("Could not read properties from file " + path);
+        }
+    }
+
+    /**
+     * Retrieves the data with the given keys at the given Properties file.
+     *
+     * @param path the file where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @return the configuration data
+     */
+    public ConfigData get(String path, Set<String> keys) {
+        Map<String, String> data = new HashMap<>();
+        if (path == null || path.isEmpty()) {
+            return new ConfigData(data);
+        }
+        try (Reader reader = reader(path)) {
+            Properties properties = new Properties();
+            properties.load(reader);
+            for (String key : keys) {
+                String value = properties.getProperty(key);
+                if (value != null) {
+                    data.put(key, value);
+                }
+            }
+            return new ConfigData(data);
+        } catch (IOException e) {
+            throw new ConfigException("Could not read properties from file " + path);
+        }
+    }
+
+    // visible for testing
+    protected Reader reader(String path) throws IOException {
+        return new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8);
+    }
+
+    public void close() {
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
new file mode 100644
index 0000000..7bc74f3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConfigTransformerTest {
+
+    public static final String MY_KEY = "myKey";
+    public static final String TEST_INDIRECTION = "testIndirection";
+    public static final String TEST_KEY = "testKey";
+    public static final String TEST_KEY_WITH_TTL = "testKeyWithTTL";
+    public static final String TEST_PATH = "testPath";
+    public static final String TEST_RESULT = "testResult";
+    public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
+
+    private ConfigTransformer configTransformer;
+
+    @Before
+    public void setup() {
+        configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
+    }
+
+    @Test
+    public void testReplaceVariable() throws Exception {
+        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+        Map<String, String> data = result.data();
+        Map<String, Long> ttls = result.ttls();
+        assertEquals(TEST_RESULT, data.get(MY_KEY));
+        assertTrue(ttls.isEmpty());
+    }
+
+    @Test
+    public void testReplaceVariableWithTTL() throws Exception {
+        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        Map<String, String> data = result.data();
+        Map<String, Long> ttls = result.ttls();
+        assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
+        assertEquals(1L, ttls.get(TEST_PATH).longValue());
+    }
+
+    @Test
+    public void testReplaceMultipleVariablesInValue() throws Exception {
+        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
+        Map<String, String> data = result.data();
+        assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
+    }
+
+    @Test
+    public void testNoReplacement() throws Exception {
+        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
+        Map<String, String> data = result.data();
+        assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
+    }
+
+    @Test
+    public void testSingleLevelOfIndirection() throws Exception {
+        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
+        Map<String, String> data = result.data();
+        assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
+    }
+
+    public static class TestConfigProvider implements ConfigProvider {
+
+        public void configure(Map<String, ?> configs) {
+        }
+
+        public ConfigData get(String path) {
+            return null;
+        }
+
+        public ConfigData get(String path, Set<String> keys) {
+            Map<String, String> data = new HashMap<>();
+            Long ttl = null;
+            if (path.equals(TEST_PATH)) {
+                if (keys.contains(TEST_KEY)) {
+                    data.put(TEST_KEY, TEST_RESULT);
+                }
+                if (keys.contains(TEST_KEY_WITH_TTL)) {
+                    data.put(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL);
+                    ttl = 1L;
+                }
+                if (keys.contains(TEST_INDIRECTION)) {
+                    data.put(TEST_INDIRECTION, "${test:testPath:testResult}");
+                }
+            }
+            return new ConfigData(data, ttl);
+        }
+
+        public void close() {
+        }
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java b/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java
new file mode 100644
index 0000000..9157e38
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class FileConfigProviderTest {
+
+    private FileConfigProvider configProvider;
+
+    @Before
+    public void setup() {
+        configProvider = new TestFileConfigProvider();
+    }
+
+    @Test
+    public void testGetAllKeysAtPath() throws Exception {
+        ConfigData configData = configProvider.get("dummy");
+        Map<String, String> result = new HashMap<>();
+        result.put("testKey", "testResult");
+        result.put("testKey2", "testResult2");
+        assertEquals(result, configData.data());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testGetOneKeyAtPath() throws Exception {
+        ConfigData configData = configProvider.get("dummy", Collections.singleton("testKey"));
+        Map<String, String> result = new HashMap<>();
+        result.put("testKey", "testResult");
+        assertEquals(result, configData.data());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testEmptyPath() throws Exception {
+        ConfigData configData = configProvider.get("", Collections.singleton("testKey"));
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testEmptyPathWithKey() throws Exception {
+        ConfigData configData = configProvider.get("");
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testNullPath() throws Exception {
+        ConfigData configData = configProvider.get(null);
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testNullPathWithKey() throws Exception {
+        ConfigData configData = configProvider.get(null, Collections.singleton("testKey"));
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    public static class TestFileConfigProvider extends FileConfigProvider {
+
+        @Override
+        protected Reader reader(String path) throws IOException {
+            return new StringReader("testKey=testResult\ntestKey2=testResult2");
+        }
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
index 1e214be..340ef80 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -25,6 +25,16 @@ import java.util.Set;
  * Context passed to SinkTasks, allowing them to access utilities in the Kafka Connect runtime.
  */
 public interface SinkTaskContext {
+
+    /**
+     * Get the Task configuration.  This is the latest configuration and may differ from that passed on startup.
+     *
+     * For example, this method can be used to obtain the latest configuration if an external secret has changed,
+     * and the configuration is using variable references such as those compatible with
+     * {@link org.apache.kafka.common.config.ConfigTransformer}.
+     */
+    public Map<String, String> configs();
+
     /**
      * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets
      * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
index 8eec1df..2e87986 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
@@ -18,12 +18,23 @@ package org.apache.kafka.connect.source;
 
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 
+import java.util.Map;
+
 /**
  * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying
  * runtime.
  */
 public interface SourceTaskContext {
     /**
+     * Get the Task configuration.  This is the latest configuration and may differ from that passed on startup.
+     *
+     * For example, this method can be used to obtain the latest configuration if an external secret has changed,
+     * and the configuration is using variable references such as those compatible with
+     * {@link org.apache.kafka.common.config.ConfigTransformer}.
+     */
+    public Map<String, String> configs();
+
+    /**
      * Get the OffsetStorageReader for this SourceTask.
      */
     OffsetStorageReader offsetStorageReader();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 54854fe4..f8c15de 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
@@ -85,12 +86,16 @@ public class ConnectDistributed {
             offsetBackingStore.configure(config);
 
             Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+            WorkerConfigTransformer configTransformer = worker.configTransformer();
 
             Converter internalValueConverter = worker.getInternalValueConverter();
             StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
             statusBackingStore.configure(config);
 
-            ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config);
+            ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
+                    internalValueConverter,
+                    config,
+                    configTransformer);
 
             DistributedHerder herder = new DistributedHerder(config, time, worker,
                     kafkaClusterId, statusBackingStore, configBackingStore,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index c315686..b5e0ec2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -91,6 +91,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
                           StatusBackingStore statusBackingStore,
                           ConfigBackingStore configBackingStore) {
         this.worker = worker;
+        this.worker.herder = this;
         this.workerId = workerId;
         this.kafkaClusterId = kafkaClusterId;
         this.statusBackingStore = statusBackingStore;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index a8dd49a..c54c160 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -38,6 +38,7 @@ import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * <p>
@@ -91,6 +92,20 @@ public class ConnectorConfig extends AbstractConfig {
     private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records.";
     private static final String TRANSFORMS_DISPLAY = "Transforms";
 
+    public static final String CONFIG_RELOAD_ACTION_CONFIG = "config.action.reload";
+    private static final String CONFIG_RELOAD_ACTION_DOC =
+            "The action that Connect should take on the connector when changes in external " +
+            "configuration providers result in a change in the connector's configuration properties. " +
+            "A value of 'none' indicates that Connect will do nothing. " +
+            "A value of 'restart' indicates that Connect should restart/reload the connector with the " +
+            "updated configuration properties." +
+            "The restart may actually be scheduled in the future if the external configuration provider " +
+            "indicates that a configuration value will expire in the future.";
+
+    private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
+    public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString();
+    public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString();
+
     private final EnrichedConnectorConfig enrichedConfig;
     private static class EnrichedConnectorConfig extends AbstractConfig {
         EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) {
@@ -120,7 +135,10 @@ public class ConnectorConfig extends AbstractConfig {
                             throw new ConfigException(name, value, "Duplicate alias provided.");
                         }
                     }
-                }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY);
+                }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
+                .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
+                        in(CONFIG_RELOAD_ACTION_NONE, CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
+                        CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY);
     }
 
     public ConnectorConfig(Plugins plugins) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 855b08a..5c7cc14 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -149,6 +149,12 @@ public interface Herder {
     void restartTask(ConnectorTaskId id, Callback<Void> cb);
 
     /**
+     * Get the configuration reload action.
+     * @param connName name of the connector
+     */
+    ConfigReloadAction connectorConfigReloadAction(final String connName);
+
+    /**
      * Restart the connector.
      * @param connName name of the connector
      * @param cb callback to invoke upon completion
@@ -156,6 +162,15 @@ public interface Herder {
     void restartConnector(String connName, Callback<Void> cb);
 
     /**
+     * Restart the connector.
+     * @param delayMs delay before restart
+     * @param connName name of the connector
+     * @param cb callback to invoke upon completion
+     * @returns The id of the request
+     */
+    HerderRequest restartConnector(long delayMs, String connName, Callback<Void> cb);
+
+    /**
      * Pause the connector. This call will asynchronously suspend processing by the connector and all
      * of its tasks.
      * @param connector name of the connector
@@ -183,6 +198,11 @@ public interface Herder {
      */
     String kafkaClusterId();
 
+    enum ConfigReloadAction {
+        NONE,
+        RESTART
+    }
+
     class Created<T> {
         private final boolean created;
         private final T result;
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java
similarity index 68%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
copy to connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java
index 8eec1df..627da4d 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java
@@ -14,17 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.source;
+package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.connect.storage.OffsetStorageReader;
-
-/**
- * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying
- * runtime.
- */
-public interface SourceTaskContext {
-    /**
-     * Get the OffsetStorageReader for this SourceTask.
-     */
-    OffsetStorageReader offsetStorageReader();
+public interface HerderRequest {
+    void cancel();
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index c58eddf..7a72a0e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Frequencies;
 import org.apache.kafka.common.metrics.stats.Total;
@@ -30,6 +31,7 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ErrorReporter;
@@ -76,6 +78,7 @@ import java.util.concurrent.Executors;
 public class Worker {
     private static final Logger log = LoggerFactory.getLogger(Worker.class);
 
+    protected Herder herder;
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
@@ -91,6 +94,7 @@ public class Worker {
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+    private WorkerConfigTransformer workerConfigTransformer;
 
     public Worker(
             String workerId,
@@ -122,6 +126,8 @@ public class Worker {
         this.offsetBackingStore = offsetBackingStore;
         this.offsetBackingStore.configure(config);
 
+        this.workerConfigTransformer = initConfigTransformer();
+
         producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -137,6 +143,28 @@ public class Worker {
         producerProps.putAll(config.originalsWithPrefix("producer."));
     }
 
+    private WorkerConfigTransformer initConfigTransformer() {
+        final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
+        Map<String, ConfigProvider> providerMap = new HashMap<>();
+        for (String providerName : providerNames) {
+            ConfigProvider configProvider = plugins.newConfigProvider(
+                    config,
+                    WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
+                    ClassLoaderUsage.PLUGINS
+            );
+            providerMap.put(providerName, configProvider);
+        }
+        return new WorkerConfigTransformer(this, providerMap);
+    }
+
+    public WorkerConfigTransformer configTransformer() {
+        return workerConfigTransformer;
+    }
+
+    protected Herder herder() {
+        return herder;
+    }
+
     /**
      * Start worker.
      */
@@ -359,6 +387,7 @@ public class Worker {
      */
     public boolean startTask(
             ConnectorTaskId id,
+            ClusterConfigState configState,
             Map<String, String> connProps,
             Map<String, String> taskProps,
             TaskStatus.Listener statusListener,
@@ -419,7 +448,7 @@ public class Worker {
                 log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
             }
 
-            workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
+            workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
             workerTask.initialize(taskConfig);
             Plugins.compareAndSwapLoaders(savedLoader);
         } catch (Throwable t) {
@@ -444,7 +473,8 @@ public class Worker {
         return true;
     }
 
-    private WorkerTask buildWorkerTask(ConnectorConfig connConfig,
+    private WorkerTask buildWorkerTask(ClusterConfigState configState,
+                                       ConnectorConfig connConfig,
                                        ConnectorTaskId id,
                                        Task task,
                                        TaskStatus.Listener statusListener,
@@ -469,13 +499,14 @@ public class Worker {
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
+            // Note we pass the configState as it performs dynamic transformations under the covers
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader,
+                    headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader,
                     time, retryWithToleranceOperator);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, connConfig, errorHandlingMetrics));
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter,
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
                     valueConverter, headerConverter, transformationChain, loader, time,
                     retryWithToleranceOperator);
         } else {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 3c76d0f..355cfbb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -190,6 +191,11 @@ public class WorkerConfig extends AbstractConfig {
             + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
             + "/opt/connectors";
 
+    public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
+    protected static final String CONFIG_PROVIDERS_DOC = "List of configuration providers. "
+            + "This is a comma-separated list of the fully-qualified names of the ConfigProvider implementations, "
+            + "in the order they will be created, configured, and used.";
+
     public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
     protected static final String REST_EXTENSION_CLASSES_DOC =
             "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
@@ -262,6 +268,9 @@ public class WorkerConfig extends AbstractConfig {
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         HEADER_CONVERTER_CLASS_DEFAULT,
                         Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
+                .define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
+                        Collections.emptyList(),
+                        Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
                         Importance.LOW, REST_EXTENSION_CLASSES_DOC);
     }
@@ -334,4 +343,5 @@ public class WorkerConfig extends AbstractConfig {
         super(definition, props);
         logInternalConverterDeprecationWarnings(props);
     }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
new file mode 100644
index 0000000..d91411c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigProvider;
+import org.apache.kafka.common.config.ConfigTransformer;
+import org.apache.kafka.common.config.ConfigTransformerResult;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A wrapper class to perform configuration transformations and schedule reloads for any
+ * retrieved TTL values.
+ */
+public class WorkerConfigTransformer {
+    private final Worker worker;
+    private final ConfigTransformer configTransformer;
+    private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
+
+    public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
+        this.worker = worker;
+        this.configTransformer = new ConfigTransformer(configProviders);
+    }
+
+    public Map<String, String> transform(String connectorName, Map<String, String> configs) {
+        ConfigTransformerResult result = configTransformer.transform(configs);
+        scheduleReload(connectorName, result.ttls());
+        return result.data();
+    }
+
+    private void scheduleReload(String connectorName, Map<String, Long> ttls) {
+        for (Map.Entry<String, Long> entry : ttls.entrySet()) {
+            scheduleReload(connectorName, entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void scheduleReload(String connectorName, String path, long ttl) {
+        Herder herder = worker.herder();
+        if (herder.connectorConfigReloadAction(connectorName) == Herder.ConfigReloadAction.RESTART) {
+            Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
+            if (connectorRequests == null) {
+                connectorRequests = new ConcurrentHashMap<>();
+                requests.put(connectorName, connectorRequests);
+            } else {
+                HerderRequest previousRequest = connectorRequests.get(path);
+                if (previousRequest != null) {
+                    // Delete previous request for ttl which is now stale
+                    previousRequest.cancel();
+                }
+            }
+            HerderRequest request = herder.restartConnector(ttl, connectorName, null);
+            connectorRequests.put(path, request);
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3296007..47f8529 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -70,6 +71,7 @@ class WorkerSinkTask extends WorkerTask {
 
     private final WorkerConfig workerConfig;
     private final SinkTask task;
+    private final ClusterConfigState configState;
     private Map<String, String> taskConfig;
     private final Time time;
     private final Converter keyConverter;
@@ -96,6 +98,7 @@ class WorkerSinkTask extends WorkerTask {
                           TaskStatus.Listener statusListener,
                           TargetState initialState,
                           WorkerConfig workerConfig,
+                          ClusterConfigState configState,
                           ConnectMetrics connectMetrics,
                           Converter keyConverter,
                           Converter valueConverter,
@@ -108,6 +111,7 @@ class WorkerSinkTask extends WorkerTask {
 
         this.workerConfig = workerConfig;
         this.task = task;
+        this.configState = configState;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
         this.headerConverter = headerConverter;
@@ -133,7 +137,7 @@ class WorkerSinkTask extends WorkerTask {
         try {
             this.taskConfig = taskConfig.originalsStrings();
             this.consumer = createConsumer();
-            this.context = new WorkerSinkTaskContext(consumer, this);
+            this.context = new WorkerSinkTaskContext(consumer, this, configState);
         } catch (Throwable t) {
             log.error("{} Task failed initialization and will not be started.", this, t);
             onFailure(t);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 386f992..3a6b0d6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,18 +38,27 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
     private final WorkerSinkTask sinkTask;
+    private final ClusterConfigState configState;
     private final Set<TopicPartition> pausedPartitions;
     private boolean commitRequested;
 
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, WorkerSinkTask sinkTask) {
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer,
+                                 WorkerSinkTask sinkTask,
+                                 ClusterConfigState configState) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
         this.sinkTask = sinkTask;
+        this.configState = configState;
         this.pausedPartitions = new HashSet<>();
     }
 
     @Override
+    public Map<String, String> configs() {
+        return configState.taskConfig(sinkTask.id());
+    }
+
+    @Override
     public void offset(Map<TopicPartition, Long> offsets) {
         log.debug("{} Setting offsets for topic partitions {}", this, offsets);
         this.offsets.putAll(offsets);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index e7b92a4..70d0cf9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -34,6 +34,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -66,6 +67,7 @@ class WorkerSourceTask extends WorkerTask {
 
     private final WorkerConfig workerConfig;
     private final SourceTask task;
+    private final ClusterConfigState configState;
     private final Converter keyConverter;
     private final Converter valueConverter;
     private final HeaderConverter headerConverter;
@@ -103,6 +105,7 @@ class WorkerSourceTask extends WorkerTask {
                             OffsetStorageReader offsetReader,
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
+                            ClusterConfigState configState,
                             ConnectMetrics connectMetrics,
                             ClassLoader loader,
                             Time time,
@@ -112,6 +115,7 @@ class WorkerSourceTask extends WorkerTask {
 
         this.workerConfig = workerConfig;
         this.task = task;
+        this.configState = configState;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
         this.headerConverter = headerConverter;
@@ -190,7 +194,7 @@ class WorkerSourceTask extends WorkerTask {
     @Override
     public void execute() {
         try {
-            task.initialize(new WorkerSourceTaskContext(offsetReader));
+            task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
             task.start(taskConfig);
             log.info("{} Source task finished initialization and start", this);
             synchronized (this) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
index 8f60e57..fe1409b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
@@ -16,15 +16,29 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 
+import java.util.Map;
+
 public class WorkerSourceTaskContext implements SourceTaskContext {
 
     private final OffsetStorageReader reader;
+    private final WorkerSourceTask task;
+    private final ClusterConfigState configState;
 
-    public WorkerSourceTaskContext(OffsetStorageReader reader) {
+    public WorkerSourceTaskContext(OffsetStorageReader reader,
+                                   WorkerSourceTask task,
+                                   ClusterConfigState configState) {
         this.reader = reader;
+        this.task = task;
+        this.configState = configState;
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return configState.taskConfig(task.id());
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index cac71dd..9507706 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
@@ -24,6 +25,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -46,6 +48,7 @@ public class ClusterConfigState {
     private final Map<String, TargetState> connectorTargetStates;
     private final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
     private final Set<String> inconsistentConnectors;
+    private final WorkerConfigTransformer configTransformer;
 
     public ClusterConfigState(long offset,
                               Map<String, Integer> connectorTaskCounts,
@@ -53,12 +56,29 @@ public class ClusterConfigState {
                               Map<String, TargetState> connectorTargetStates,
                               Map<ConnectorTaskId, Map<String, String>> taskConfigs,
                               Set<String> inconsistentConnectors) {
+        this(offset,
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
+                inconsistentConnectors,
+                null);
+    }
+
+    public ClusterConfigState(long offset,
+                              Map<String, Integer> connectorTaskCounts,
+                              Map<String, Map<String, String>> connectorConfigs,
+                              Map<String, TargetState> connectorTargetStates,
+                              Map<ConnectorTaskId, Map<String, String>> taskConfigs,
+                              Set<String> inconsistentConnectors,
+                              WorkerConfigTransformer configTransformer) {
         this.offset = offset;
         this.connectorTaskCounts = connectorTaskCounts;
         this.connectorConfigs = connectorConfigs;
         this.connectorTargetStates = connectorTargetStates;
         this.taskConfigs = taskConfigs;
         this.inconsistentConnectors = inconsistentConnectors;
+        this.configTransformer = configTransformer;
     }
 
     /**
@@ -87,12 +107,19 @@ public class ClusterConfigState {
     }
 
     /**
-     * Get the configuration for a connector.
+     * Get the configuration for a connector.  The configuration will have been transformed by
+     * {@link org.apache.kafka.common.config.ConfigTransformer} by having all variable
+     * references replaced with the current values from external instances of
+     * {@link org.apache.kafka.common.config.ConfigProvider}, and may include secrets.
      * @param connector name of the connector
      * @return a map containing configuration parameters
      */
     public Map<String, String> connectorConfig(String connector) {
-        return connectorConfigs.get(connector);
+        Map<String, String> configs = connectorConfigs.get(connector);
+        if (configTransformer != null) {
+            configs = configTransformer.transform(connector, configs);
+        }
+        return configs;
     }
 
     /**
@@ -105,12 +132,19 @@ public class ClusterConfigState {
     }
 
     /**
-     * Get the configuration for a task.
+     * Get the configuration for a task.  The configuration will have been transformed by
+     * {@link org.apache.kafka.common.config.ConfigTransformer} by having all variable
+     * references replaced with the current values from external instances of
+     * {@link org.apache.kafka.common.config.ConfigProvider}, and may include secrets.
      * @param task id of the task
      * @return a map containing configuration parameters
      */
     public Map<String, String> taskConfig(ConnectorTaskId task) {
-        return taskConfigs.get(task);
+        Map<String, String> configs = taskConfigs.get(task);
+        if (configTransformer != null) {
+            configs = configTransformer.transform(task.connector(), configs);
+        }
+        return configs;
     }
 
     /**
@@ -184,4 +218,30 @@ public class ClusterConfigState {
                 ", inconsistentConnectors=" + inconsistentConnectors +
                 '}';
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClusterConfigState that = (ClusterConfigState) o;
+        return offset == that.offset &&
+                Objects.equals(connectorTaskCounts, that.connectorTaskCounts) &&
+                Objects.equals(connectorConfigs, that.connectorConfigs) &&
+                Objects.equals(connectorTargetStates, that.connectorTargetStates) &&
+                Objects.equals(taskConfigs, that.taskConfigs) &&
+                Objects.equals(inconsistentConnectors, that.inconsistentConnectors) &&
+                Objects.equals(configTransformer, that.configTransformer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                offset,
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
+                inconsistentConnectors,
+                configTransformer);
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 5e9707a..5efb78a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -38,6 +38,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.HerderRequest;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
@@ -60,6 +61,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
@@ -139,7 +141,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     // To handle most external requests, like creating or destroying a connector, we can use a generic request where
     // the caller specifies all the code that should be executed.
-    final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
+    final NavigableSet<DistributedHerderRequest> requests = new ConcurrentSkipListSet<>();
     // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
@@ -255,7 +257,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         final long now = time.milliseconds();
         long nextRequestTimeoutMs = Long.MAX_VALUE;
         while (true) {
-            final HerderRequest next = peekWithoutException();
+            final DistributedHerderRequest next = peekWithoutException();
             if (next == null) {
                 break;
             } else if (now >= next.at) {
@@ -382,7 +384,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
             // Explicitly fail any outstanding requests so they actually get a response and get an
             // understandable reason for their failure.
-            HerderRequest request = requests.pollFirst();
+            DistributedHerderRequest request = requests.pollFirst();
             while (request != null) {
                 request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
                 request = requests.pollFirst();
@@ -641,8 +643,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
+    public ConfigReloadAction connectorConfigReloadAction(final String connName) {
+        return ConfigReloadAction.valueOf(
+                configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
+                        .toUpperCase(Locale.ROOT));
+    }
+
+    @Override
     public void restartConnector(final String connName, final Callback<Void> callback) {
-        addRequest(new Callable<Void>() {
+        restartConnector(0, connName, callback);
+    }
+
+    @Override
+    public HerderRequest restartConnector(final long delayMs, final String connName, final Callback<Void> callback) {
+        return addRequest(delayMs, new Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 if (checkRebalanceNeeded(callback))
@@ -858,6 +872,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         log.info("Starting task {}", taskId);
         return worker.startTask(
                 taskId,
+                configState,
                 configState.connectorConfig(taskId.connector()),
                 configState.taskConfig(taskId),
                 this,
@@ -945,7 +960,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             public void onCompletion(Throwable error, Void result) {
                 // If we encountered an error, we don't have much choice but to just retry. If we don't, we could get
                 // stuck with a connector that thinks it has generated tasks, but wasn't actually successful and therefore
-                // never makes progress. The retry has to run through a HerderRequest since this callback could be happening
+                // never makes progress. The retry has to run through a DistributedHerderRequest since this callback could be happening
                 // from the HTTP request forwarding thread.
                 if (error != null) {
                     log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error);
@@ -1041,19 +1056,19 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         return false;
     }
 
-    HerderRequest addRequest(Callable<Void> action, Callback<Void> callback) {
+    DistributedHerderRequest addRequest(Callable<Void> action, Callback<Void> callback) {
         return addRequest(0, action, callback);
     }
 
-    HerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
-        HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback);
+    DistributedHerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
+        DistributedHerderRequest req = new DistributedHerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback);
         requests.add(req);
         if (peekWithoutException() == req)
             member.wakeup();
         return req;
     }
 
-    private HerderRequest peekWithoutException() {
+    private DistributedHerderRequest peekWithoutException() {
         try {
             return requests.isEmpty() ? null : requests.first();
         } catch (NoSuchElementException e) {
@@ -1117,13 +1132,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
-    static class HerderRequest implements Comparable<HerderRequest> {
+    class DistributedHerderRequest implements HerderRequest, Comparable<DistributedHerderRequest> {
         private final long at;
         private final long seq;
         private final Callable<Void> action;
         private final Callback<Void> callback;
 
-        public HerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) {
+        public DistributedHerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) {
             this.at = at;
             this.seq = seq;
             this.action = action;
@@ -1139,7 +1154,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
 
         @Override
-        public int compareTo(HerderRequest o) {
+        public void cancel() {
+            DistributedHerder.this.requests.remove(this);
+        }
+
+        @Override
+        public int compareTo(DistributedHerderRequest o) {
             final int cmp = Long.compare(at, o.at);
             return cmp == 0 ? Long.compare(seq, o.seq) : cmp;
         }
@@ -1147,9 +1167,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
-            if (!(o instanceof HerderRequest))
+            if (!(o instanceof DistributedHerderRequest))
                 return false;
-            HerderRequest other = (HerderRequest) o;
+            DistributedHerderRequest other = (DistributedHerderRequest) o;
             return compareTo(other) == 0;
         }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index b56bd1a..1e59851 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
@@ -66,6 +67,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private final SortedSet<PluginDesc<Converter>> converters;
     private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
     private final SortedSet<PluginDesc<Transformation>> transformations;
+    private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
     private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
     private final List<String> pluginPaths;
     private final Map<Path, PluginClassLoader> activePaths;
@@ -80,6 +82,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         this.converters = new TreeSet<>();
         this.headerConverters = new TreeSet<>();
         this.transformations = new TreeSet<>();
+        this.configProviders = new TreeSet<>();
         this.restExtensions = new TreeSet<>();
     }
 
@@ -103,6 +106,10 @@ public class DelegatingClassLoader extends URLClassLoader {
         return transformations;
     }
 
+    public Set<PluginDesc<ConfigProvider>> configProviders() {
+        return configProviders;
+    }
+
     public Set<PluginDesc<ConnectRestExtension>> restExtensions() {
         return restExtensions;
     }
@@ -236,6 +243,8 @@ public class DelegatingClassLoader extends URLClassLoader {
             headerConverters.addAll(plugins.headerConverters());
             addPlugins(plugins.transformations(), loader);
             transformations.addAll(plugins.transformations());
+            addPlugins(plugins.configProviders(), loader);
+            configProviders.addAll(plugins.configProviders());
             addPlugins(plugins.restExtensions(), loader);
             restExtensions.addAll(plugins.restExtensions());
         }
@@ -292,6 +301,7 @@ public class DelegatingClassLoader extends URLClassLoader {
                 getPluginDesc(reflections, Converter.class, loader),
                 getPluginDesc(reflections, HeaderConverter.class, loader),
                 getPluginDesc(reflections, Transformation.class, loader),
+                getPluginDesc(reflections, ConfigProvider.class, loader),
                 getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
         );
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index 6f48e56..87b0b70 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
@@ -29,6 +30,7 @@ public class PluginScanResult {
     private final Collection<PluginDesc<Converter>> converters;
     private final Collection<PluginDesc<HeaderConverter>> headerConverters;
     private final Collection<PluginDesc<Transformation>> transformations;
+    private final Collection<PluginDesc<ConfigProvider>> configProviders;
     private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
 
     public PluginScanResult(
@@ -36,12 +38,14 @@ public class PluginScanResult {
             Collection<PluginDesc<Converter>> converters,
             Collection<PluginDesc<HeaderConverter>> headerConverters,
             Collection<PluginDesc<Transformation>> transformations,
+            Collection<PluginDesc<ConfigProvider>> configProviders,
             Collection<PluginDesc<ConnectRestExtension>> restExtensions
     ) {
         this.connectors = connectors;
         this.converters = converters;
         this.headerConverters = headerConverters;
         this.transformations = transformations;
+        this.configProviders = configProviders;
         this.restExtensions = restExtensions;
     }
 
@@ -61,6 +65,10 @@ public class PluginScanResult {
         return transformations;
     }
 
+    public Collection<PluginDesc<ConfigProvider>> configProviders() {
+        return configProviders;
+    }
+
     public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
         return restExtensions;
     }
@@ -70,6 +78,7 @@ public class PluginScanResult {
                && converters().isEmpty()
                && headerConverters().isEmpty()
                && transformations().isEmpty()
+               && configProviders().isEmpty()
                && restExtensions().isEmpty();
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 918f9d7..906b85f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -31,6 +32,7 @@ public enum PluginType {
     CONNECTOR(Connector.class),
     CONVERTER(Converter.class),
     TRANSFORMATION(Transformation.class),
+    CONFIGPROVIDER(ConfigProvider.class),
     REST_EXTENSION(ConnectRestExtension.class),
     UNKNOWN(Object.class);
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 9607410..c89accd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.isolation;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
@@ -147,6 +148,10 @@ public class Plugins {
         return delegatingLoader.transformations();
     }
 
+    public Set<PluginDesc<ConfigProvider>> configProviders() {
+        return delegatingLoader.configProviders();
+    }
+
     public Connector newConnector(String connectorClassOrAlias) {
         Class<? extends Connector> klass;
         try {
@@ -318,6 +323,45 @@ public class Plugins {
         return plugin;
     }
 
+    public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
+        String classPropertyName = providerPrefix + ".class";
+        Map<String, String> originalConfig = config.originalsStrings();
+        if (!originalConfig.containsKey(classPropertyName)) {
+            // This configuration does not define the config provider via the specified property name
+            return null;
+        }
+        ConfigProvider plugin = null;
+        switch (classLoaderUsage) {
+            case CURRENT_CLASSLOADER:
+                // Attempt to load first with the current classloader, and plugins as a fallback.
+                plugin = getInstance(config, classPropertyName, ConfigProvider.class);
+                break;
+            case PLUGINS:
+                // Attempt to load with the plugin class loader, which uses the current classloader as a fallback
+                String configProviderClassOrAlias = originalConfig.get(classPropertyName);
+                Class<? extends ConfigProvider> klass;
+                try {
+                    klass = pluginClass(delegatingLoader, configProviderClassOrAlias, ConfigProvider.class);
+                } catch (ClassNotFoundException e) {
+                    throw new ConnectException(
+                            "Failed to find any class that implements ConfigProvider and which name matches "
+                                    + configProviderClassOrAlias + ", available ConfigProviders are: "
+                                    + pluginNames(delegatingLoader.configProviders())
+                    );
+                }
+                plugin = newPlugin(klass);
+                break;
+        }
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate the ConfigProvider specified in '" + classPropertyName + "'");
+        }
+
+        // Configure the ConfigProvider
+        String configPrefix = providerPrefix + ".param.";
+        Map<String, Object> configProviderConfig = config.originalsWithPrefix(configPrefix);
+        plugin.configure(configProviderConfig);
+        return plugin;
+    }
 
     /**
      * If the given class names are available in the classloader, return a list of new configured
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 96f8e87..20c6a24 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.HerderRequest;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
@@ -41,7 +42,14 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -50,10 +58,17 @@ import java.util.Map;
 public class StandaloneHerder extends AbstractHerder {
     private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
 
+    private final AtomicLong requestSeqNum = new AtomicLong();
+    private final ScheduledExecutorService requestExecutorService;
+
     private ClusterConfigState configState;
 
     public StandaloneHerder(Worker worker, String kafkaClusterId) {
-        this(worker, worker.workerId(), kafkaClusterId, new MemoryStatusBackingStore(), new MemoryConfigBackingStore());
+        this(worker,
+                worker.workerId(),
+                kafkaClusterId,
+                new MemoryStatusBackingStore(),
+                new MemoryConfigBackingStore(worker.configTransformer()));
     }
 
     // visible for testing
@@ -64,6 +79,7 @@ public class StandaloneHerder extends AbstractHerder {
                      MemoryConfigBackingStore configBackingStore) {
         super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
         this.configState = ClusterConfigState.EMPTY;
+        this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -77,6 +93,13 @@ public class StandaloneHerder extends AbstractHerder {
     @Override
     public synchronized void stop() {
         log.info("Herder stopping");
+        requestExecutorService.shutdown();
+        try {
+            if (!requestExecutorService.awaitTermination(30, TimeUnit.SECONDS))
+                requestExecutorService.shutdownNow();
+        } catch (InterruptedException e) {
+            // ignore
+        }
 
         // There's no coordination/hand-off to do here since this is all standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
@@ -229,13 +252,20 @@ public class StandaloneHerder extends AbstractHerder {
 
         TargetState targetState = configState.targetState(taskId.connector());
         worker.stopAndAwaitTask(taskId);
-        if (worker.startTask(taskId, connConfigProps, taskConfigProps, this, targetState))
+        if (worker.startTask(taskId, configState, connConfigProps, taskConfigProps, this, targetState))
             cb.onCompletion(null, null);
         else
             cb.onCompletion(new ConnectException("Failed to start task: " + taskId), null);
     }
 
     @Override
+    public ConfigReloadAction connectorConfigReloadAction(final String connName) {
+        return ConfigReloadAction.valueOf(
+                configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
+                        .toUpperCase(Locale.ROOT));
+    }
+
+    @Override
     public synchronized void restartConnector(String connName, Callback<Void> cb) {
         if (!configState.contains(connName))
             cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
@@ -248,11 +278,24 @@ public class StandaloneHerder extends AbstractHerder {
             cb.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
     }
 
+    @Override
+    public synchronized HerderRequest restartConnector(long delayMs, final String connName, final Callback<Void> cb) {
+        ScheduledFuture<?> future = requestExecutorService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                restartConnector(connName, cb);
+            }
+        }, delayMs, TimeUnit.MILLISECONDS);
+
+        return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(), future);
+    }
+
     private boolean startConnector(Map<String, String> connectorProps) {
         String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
         configBackingStore.putConnectorConfig(connName, connectorProps);
+        Map<String, String> connConfigs = configState.connectorConfig(connName);
         TargetState targetState = configState.targetState(connName);
-        return worker.startConnector(connName, connectorProps, new HerderConnectorContext(this, connName), this, targetState);
+        return worker.startConnector(connName, connConfigs, new HerderConnectorContext(this, connName), this, targetState);
     }
 
     private List<Map<String, String>> recomputeTaskConfigs(String connName) {
@@ -270,7 +313,7 @@ public class StandaloneHerder extends AbstractHerder {
 
         for (ConnectorTaskId taskId : configState.tasks(connName)) {
             Map<String, String> taskConfigMap = configState.taskConfig(taskId);
-            worker.startTask(taskId, connConfigs, taskConfigMap, this, initialState);
+            worker.startTask(taskId, configState, connConfigs, taskConfigMap, this, initialState);
         }
     }
 
@@ -342,4 +385,32 @@ public class StandaloneHerder extends AbstractHerder {
         }
     }
 
+    static class StandaloneHerderRequest implements HerderRequest {
+        private final long seq;
+        private final ScheduledFuture<?> future;
+
+        public StandaloneHerderRequest(long seq, ScheduledFuture<?> future) {
+            this.seq = seq;
+            this.future = future;
+        }
+
+        @Override
+        public void cancel() {
+            future.cancel(false);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof StandaloneHerderRequest))
+                return false;
+            StandaloneHerderRequest other = (StandaloneHerderRequest) o;
+            return seq == other.seq;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(seq);
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index e51b365..ea19665 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
@@ -221,7 +222,9 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
     private final Map<String, TargetState> connectorTargetStates = new HashMap<>();
 
-    public KafkaConfigBackingStore(Converter converter, WorkerConfig config) {
+    private final WorkerConfigTransformer configTransformer;
+
+    public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
         this.lock = new Object();
         this.started = false;
         this.converter = converter;
@@ -232,6 +235,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
             throw new ConfigException("Must specify topic for connector configuration.");
 
         configLog = setupAndCreateKafkaBasedLog(this.topic, config);
+        this.configTransformer = configTransformer;
     }
 
     @Override
@@ -270,7 +274,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                     new HashMap<>(connectorConfigs),
                     new HashMap<>(connectorTargetStates),
                     new HashMap<>(taskConfigs),
-                    new HashSet<>(inconsistent)
+                    new HashSet<>(inconsistent),
+                    configTransformer
             );
         }
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index 25891f5..7e7d62b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
@@ -32,6 +33,14 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
 
     private Map<String, ConnectorState> connectors = new HashMap<>();
     private UpdateListener updateListener;
+    private WorkerConfigTransformer configTransformer;
+
+    public MemoryConfigBackingStore() {
+    }
+
+    public MemoryConfigBackingStore(WorkerConfigTransformer configTransformer) {
+        this.configTransformer = configTransformer;
+    }
 
     @Override
     public synchronized void start() {
@@ -63,7 +72,8 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
                 connectorConfigs,
                 connectorTargetStates,
                 taskConfigs,
-                Collections.<String>emptySet());
+                Collections.<String>emptySet(),
+                configTransformer);
     }
 
     @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 0718eb1..da017e8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -172,14 +172,14 @@ public class AbstractHerderTest {
         assertEquals(TestSourceConnector.class.getName(), result.name());
         assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 7 fields, connector's configs add 2
-        assertEquals(9, result.values().size());
+        // Base connector config has 8 fields, connector's configs add 2
+        assertEquals(10, result.values().size());
         // Missing name should generate an error
         assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name());
         assertEquals(1, result.values().get(0).configValue().errors().size());
         // "required" config from connector should generate an error
-        assertEquals("required", result.values().get(7).configValue().name());
-        assertEquals(1, result.values().get(7).configValue().errors().size());
+        assertEquals("required", result.values().get(8).configValue().name());
+        assertEquals(1, result.values().get(8).configValue().errors().size());
 
         verifyAll();
     }
@@ -233,15 +233,15 @@ public class AbstractHerderTest {
         );
         assertEquals(expectedGroups, result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 7 fields, connector's configs add 2, 2 type fields from the transforms, and
+        // Base connector config has 8 fields, connector's configs add 2, 2 type fields from the transforms, and
         // 1 from the valid transformation's config
-        assertEquals(12, result.values().size());
+        assertEquals(13, result.values().size());
         // Should get 2 type fields from the transforms, first adds its own config since it has a valid class
-        assertEquals("transforms.xformA.type", result.values().get(7).configValue().name());
-        assertTrue(result.values().get(7).configValue().errors().isEmpty());
-        assertEquals("transforms.xformA.subconfig", result.values().get(8).configValue().name());
-        assertEquals("transforms.xformB.type", result.values().get(9).configValue().name());
-        assertFalse(result.values().get(9).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.type", result.values().get(8).configValue().name());
+        assertTrue(result.values().get(8).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.subconfig", result.values().get(9).configValue().name());
+        assertEquals("transforms.xformB.type", result.values().get(10).configValue().name());
+        assertFalse(result.values().get(10).configValue().errors().isEmpty());
 
         verifyAll();
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 5a8bcc5..b50e7ff 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
@@ -369,7 +370,8 @@ public class ErrorHandlingTaskTest {
 
         workerSinkTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, converter, converter,
+                taskId, sinkTask, statusListener, initialState, workerConfig,
+                ClusterConfigState.EMPTY, metrics, converter, converter,
                 headerConverter, sinkTransforms, pluginLoader, time, retryWithToleranceOperator);
     }
 
@@ -398,7 +400,8 @@ public class ErrorHandlingTaskTest {
         workerSourceTask = PowerMock.createPartialMock(
                 WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"},
                 taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms,
-                producer, offsetReader, offsetWriter, workerConfig, metrics, pluginLoader, time, retryWithToleranceOperator);
+                producer, offsetReader, offsetWriter, workerConfig,
+                ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator);
     }
 
     private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
new file mode 100644
index 0000000..89bba09
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigChangeCallback;
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigProvider;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+public class WorkerConfigTransformerTest {
+
+    public static final String MY_KEY = "myKey";
+    public static final String MY_CONNECTOR = "myConnector";
+    public static final String TEST_KEY = "testKey";
+    public static final String TEST_PATH = "testPath";
+    public static final String TEST_KEY_WITH_TTL = "testKeyWithTTL";
+    public static final String TEST_KEY_WITH_LONGER_TTL = "testKeyWithLongerTTL";
+    public static final String TEST_RESULT = "testResult";
+    public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
+    public static final String TEST_RESULT_WITH_LONGER_TTL = "testResultWithLongerTTL";
+
+    @Mock private Herder herder;
+    @Mock private Worker worker;
+    @Mock private HerderRequest requestId;
+    private WorkerConfigTransformer configTransformer;
+
+    @Before
+    public void setup() {
+        worker = PowerMock.createMock(Worker.class);
+        herder = PowerMock.createMock(Herder.class);
+        configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider()));
+    }
+
+    @Test
+    public void testReplaceVariable() throws Exception {
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+        assertEquals(TEST_RESULT, result.get(MY_KEY));
+    }
+
+    @Test
+    public void testReplaceVariableWithTTL() throws Exception {
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE);
+
+        replayAll();
+
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+    }
+
+    @Test
+    public void testReplaceVariableWithTTLAndScheduleRestart() throws Exception {
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
+        EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
+
+        replayAll();
+
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+    }
+
+    @Test
+    public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() throws Exception {
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
+        EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
+
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
+        EasyMock.expectLastCall();
+        requestId.cancel();
+        EasyMock.expectLastCall();
+        EasyMock.expect(herder.restartConnector(10L, MY_CONNECTOR, null)).andReturn(requestId);
+
+        replayAll();
+
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+
+        result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));
+        assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY));
+    }
+
+    public static class TestConfigProvider implements ConfigProvider {
+
+        public void configure(Map<String, ?> configs) {
+        }
+
+        public ConfigData get(String path) {
+            return null;
+        }
+
+        public ConfigData get(String path, Set<String> keys) {
+            if (path.equals(TEST_PATH)) {
+                if (keys.contains(TEST_KEY)) {
+                    return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT));
+                } else if (keys.contains(TEST_KEY_WITH_TTL)) {
+                    return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
+                } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
+                    return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
+                }
+            }
+            return new ConfigData(Collections.emptyMap());
+        }
+
+        public void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void unsubscribe(String path, Set<String> keys) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void close() {
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index ff8507c..d23adbf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -162,7 +163,7 @@ public class WorkerSinkTaskTest {
     private void createTask(TargetState initialState) {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, metrics,
+                taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics,
                 keyConverter, valueConverter, headerConverter,
                 transformationChain, pluginLoader, time,
                 RetryWithToleranceOperator.NOOP_OPERATOR);
@@ -1463,5 +1464,4 @@ public class WorkerSinkTaskTest {
 
     private abstract static class TestSinkTask extends SinkTask  {
     }
-
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 61d8778..800301e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -137,7 +138,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter,
+                taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter,
                 valueConverter, headerConverter,
                 new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR),
                 pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR);
@@ -700,5 +701,4 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
     private static abstract class TestSinkTask extends SinkTask {
     }
-
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 77f4ad9..1482d75 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -104,6 +105,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Mock private KafkaProducer<byte[], byte[]> producer;
     @Mock private OffsetStorageReader offsetReader;
     @Mock private OffsetStorageWriter offsetWriter;
+    @Mock private ClusterConfigState clusterConfigState;
     private WorkerSourceTask workerTask;
     @Mock private Future<RecordMetadata> sendFuture;
     @MockStrict private TaskStatus.Listener statusListener;
@@ -148,7 +150,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
-                transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                transformationChain, producer, offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
                 RetryWithToleranceOperator.NOOP_OPERATOR);
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index d29eef5..6fa7ed1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -491,6 +492,7 @@ public class WorkerTest extends ThreadedTest {
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 EasyMock.eq(config),
+                anyObject(ClusterConfigState.class),
                 anyObject(ConnectMetrics.class),
                 anyObject(ClassLoader.class),
                 anyObject(Time.class),
@@ -547,7 +549,7 @@ public class WorkerTest extends ThreadedTest {
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 0, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
         assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
@@ -598,7 +600,7 @@ public class WorkerTest extends ThreadedTest {
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 0, 0, 0, 0);
 
-        assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
+        assertFalse(worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
         assertStartupStatistics(worker, 0, 0, 1, 1);
 
         assertStatistics(worker, 0, 0);
@@ -629,6 +631,7 @@ public class WorkerTest extends ThreadedTest {
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
+                anyObject(ClusterConfigState.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
                 anyObject(Time.class),
@@ -688,7 +691,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
         worker.start();
         assertStatistics(worker, 0, 0);
-        worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
         worker.stop();
         assertStatistics(worker, 0, 0);
@@ -721,6 +724,7 @@ public class WorkerTest extends ThreadedTest {
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
+                anyObject(ClusterConfigState.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
                 anyObject(Time.class),
@@ -784,7 +788,7 @@ public class WorkerTest extends ThreadedTest {
         connProps.put("key.converter.extra.config", "foo");
         connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConfigurableConverter.class.getName());
         connProps.put("value.converter.extra.config", "bar");
-        worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, ClusterConfigState.EMPTY, connProps, origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d7a7d87..911afe7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -214,7 +214,7 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -241,7 +241,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -288,7 +288,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -756,7 +756,7 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
 
@@ -770,7 +770,7 @@ public class DistributedHerderTest {
 
         worker.stopAndAwaitTask(TASK0);
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
 
@@ -820,10 +820,10 @@ public class DistributedHerderTest {
 
     @Test
     public void testRequestProcessingOrder() throws Exception {
-        final DistributedHerder.HerderRequest req1 = herder.addRequest(100, null, null);
-        final DistributedHerder.HerderRequest req2 = herder.addRequest(10, null, null);
-        final DistributedHerder.HerderRequest req3 = herder.addRequest(200, null, null);
-        final DistributedHerder.HerderRequest req4 = herder.addRequest(200, null, null);
+        final DistributedHerder.DistributedHerderRequest req1 = herder.addRequest(100, null, null);
+        final DistributedHerder.DistributedHerderRequest req2 = herder.addRequest(10, null, null);
+        final DistributedHerder.DistributedHerderRequest req3 = herder.addRequest(200, null, null);
+        final DistributedHerder.DistributedHerderRequest req4 = herder.addRequest(200, null, null);
 
         assertEquals(req2, herder.requests.pollFirst()); // lowest delay
         assertEquals(req1, herder.requests.pollFirst()); // next lowest delay
@@ -1080,7 +1080,7 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1117,7 +1117,7 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1157,7 +1157,7 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1210,7 +1210,7 @@ public class DistributedHerderTest {
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
                 Arrays.asList(TASK0));
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1250,7 +1250,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index fd330f2..5372a3a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConnector;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -68,6 +69,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -325,6 +327,7 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
+
     @Test
     public void testRestartTask() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
@@ -337,7 +340,14 @@ public class StandaloneHerderTest {
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
 
-        worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        ClusterConfigState configState = new ClusterConfigState(
+                -1,
+                Collections.singletonMap(CONNECTOR_NAME, 1),
+                Collections.singletonMap(CONNECTOR_NAME, connectorConfig),
+                Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
+                Collections.singletonMap(taskId, taskConfig(SourceSink.SOURCE)),
+                new HashSet<>());
+        worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
         PowerMock.replayAll();
@@ -363,7 +373,14 @@ public class StandaloneHerderTest {
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
 
-        worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        ClusterConfigState configState = new ClusterConfigState(
+                -1,
+                Collections.singletonMap(CONNECTOR_NAME, 1),
+                Collections.singletonMap(CONNECTOR_NAME, connectorConfig),
+                Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
+                Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)),
+                new HashSet<>());
+        worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(false);
 
         PowerMock.replayAll();
@@ -597,7 +614,14 @@ public class StandaloneHerderTest {
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
             .andReturn(singletonList(generatedTaskProps));
 
-        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
+        ClusterConfigState configState = new ClusterConfigState(
+                -1,
+                Collections.singletonMap(CONNECTOR_NAME, 1),
+                Collections.singletonMap(CONNECTOR_NAME, connectorConfig(sourceSink)),
+                Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
+                Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps),
+                new HashSet<>());
+        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
         EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index aac1b78..ed62d9b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -146,7 +146,7 @@ public class KafkaConfigBackingStoreTest {
 
     @Before
     public void setUp() {
-        configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG, null);
         Whitebox.setInternalState(configStorage, "configLog", storeLog);
         configStorage.setUpdateListener(configUpdateListener);
     }

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message