kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/3] kafka git commit: KAFKA-3487: Support classloading isolation in Connect (KIP-146)
Date Thu, 18 May 2017 17:39:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5aaaba7ff -> 45f226176


http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
new file mode 100644
index 0000000..c943863
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PluginUtilsTest {
+    @Rule
+    public TemporaryFolder rootDir = new TemporaryFolder();
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @Test
+    public void testJavaLibraryClasses() throws Exception {
+        assertFalse(PluginUtils.shouldLoadInIsolation("java."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.Object"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.String"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.util.HashMap$Entry"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("java.io.Serializable"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("javax."));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "javax.management.loading.ClassLoaderRepository")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.omg."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA.Object"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom.traversal.TreeWalker"));
+    }
+
+    @Test
+    public void testThirdPartyClasses() throws Exception {
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j.Level"));
+    }
+
+    @Test
+    public void testConnectFrameworkClasses() throws Exception {
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.common."));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.config.AbstractConfig")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.config.ConfigDef$Type")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.common.serialization.Deserializer")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect."));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.connector.Connector")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.source.SourceConnector")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.sink.SinkConnector")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.connector.Task"));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.source.SourceTask")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.sink.SinkTask"));
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.Transformation")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.storage.Converter")
+        );
+        assertFalse(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.storage.OffsetBackingStore")
+        );
+    }
+
+    @Test
+    public void testAllowedConnectFrameworkClasses() throws Exception {
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.ExtractField")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.transforms.ExtractField$Key")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.json.JsonConverter")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.json.JsonConverter$21")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.file.FileStreamSourceTask")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.file.FileStreamSinkConnector")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.converters."));
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.converters.ByteArrayConverter")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.storage.StringConverter")
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 966098c..2d0448e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -31,6 +30,9 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -57,7 +59,8 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
+import javax.ws.rs.BadRequestException;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,8 +69,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import javax.ws.rs.BadRequestException;
+import java.util.TreeSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -88,12 +90,14 @@ public class ConnectorPluginsResourceTest {
 
         props = new HashMap<>(partialProps);
         props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName());
+        props.put("plugin.path", null);
     }
 
     private static final ConfigInfos CONFIG_INFOS;
     private static final ConfigInfos PARTIAL_CONFIG_INFOS;
     private static final int ERROR_COUNT = 0;
     private static final int PARTIAL_CONFIG_ERROR_COUNT = 1;
+    private static final Set<PluginDesc<Connector>> CONNECTOR_PLUGINS = new TreeSet<>();
 
     static {
         List<ConfigInfo> configs = new LinkedList<>();
@@ -133,19 +137,58 @@ public class ConnectorPluginsResourceTest {
 
         CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(),
ERROR_COUNT, Collections.singletonList("Test"), configs);
         PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(),
PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
+
+        Class<?>[] abstractConnectorClasses = {
+            Connector.class,
+            SourceConnector.class,
+            SinkConnector.class
+        };
+
+        Class<?>[] connectorClasses = {
+            VerifiableSourceConnector.class,
+            VerifiableSinkConnector.class,
+            MockSourceConnector.class,
+            MockSinkConnector.class,
+            MockConnector.class,
+            SchemaSourceConnector.class,
+            ConnectorPluginsResourceTestConnector.class
+        };
+
+        try {
+            for (Class<?> klass : abstractConnectorClasses) {
+                CONNECTOR_PLUGINS.add(
+                        new MockConnectorPluginDesc((Class<? extends Connector>) klass,
"0.0.0"));
+            }
+            for (Class<?> klass : connectorClasses) {
+                CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc((Class<? extends Connector>)
klass));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Mock
     private Herder herder;
+    @Mock
+    private Plugins plugins;
     private ConnectorPluginsResource connectorPluginsResource;
 
     @Before
-    public void setUp() throws NoSuchMethodException {
+    public void setUp() throws Exception {
         PowerMock.mockStatic(RestServer.class,
                              RestServer.class.getMethod("httpRequest", String.class, String.class,
Object.class, TypeReference.class));
+
+        plugins = PowerMock.createMock(Plugins.class);
+        herder = PowerMock.createMock(AbstractHerder.class);
         connectorPluginsResource = new ConnectorPluginsResource(herder);
     }
 
+    private void expectPlugins() {
+        EasyMock.expect(herder.plugins()).andReturn(plugins);
+        EasyMock.expect(plugins.connectors()).andReturn(CONNECTOR_PLUGINS);
+        PowerMock.replayAll();
+    }
+
     @Test
     public void testValidateConfigWithSingleErrorDueToMissingConnectorClassname() throws
Throwable {
         herder.validateConnectorConfig(EasyMock.eq(partialProps));
@@ -359,27 +402,30 @@ public class ConnectorPluginsResourceTest {
     }
 
     @Test
-    public void testListConnectorPlugins() {
+    public void testListConnectorPlugins() throws Exception {
+        expectPlugins();
         Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class)));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class)));
-        assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(Connector.class, "0.0")));
+        assertFalse(connectorPlugins.contains(newInfo(SourceConnector.class, "0.0")));
+        assertFalse(connectorPlugins.contains(newInfo(SinkConnector.class, "0.0")));
+        assertFalse(connectorPlugins.contains(newInfo(VerifiableSourceConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(VerifiableSinkConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(MockSourceConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(MockSinkConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(MockConnector.class)));
+        assertFalse(connectorPlugins.contains(newInfo(SchemaSourceConnector.class)));
+        assertTrue(connectorPlugins.contains(newInfo(ConnectorPluginsResourceTestConnector.class)));
+        PowerMock.verifyAll();
     }
 
     @Test
-    public void testConnectorPluginsIncludesTypeAndVersionInformation()
-        throws IOException {
-        ConnectorPluginInfo sinkInfo = new ConnectorPluginInfo(TestSinkConnector.class);
-        ConnectorPluginInfo sourceInfo = new ConnectorPluginInfo(TestSourceConnector.class);
+    public void testConnectorPluginsIncludesTypeAndVersionInformation() throws Exception
{
+        expectPlugins();
+        ConnectorPluginInfo sinkInfo = newInfo(TestSinkConnector.class);
+        ConnectorPluginInfo sourceInfo =
+                newInfo(TestSourceConnector.class);
         ConnectorPluginInfo unkownInfo =
-            new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class);
+            newInfo(ConnectorPluginsResourceTestConnector.class);
         assertEquals(ConnectorType.SINK, sinkInfo.type());
         assertEquals(ConnectorType.SOURCE, sourceInfo.type());
         assertEquals(ConnectorType.UNKNOWN, unkownInfo.type());
@@ -407,6 +453,46 @@ public class ConnectorPluginsResourceTest {
         );
     }
 
+    protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass,
String version)
+            throws Exception {
+        return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass, version));
+    }
+
+    protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass)
+            throws Exception {
+        return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass));
+    }
+
+    public static class MockPluginClassLoader extends PluginClassLoader {
+        public MockPluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent)
{
+            super(pluginLocation, urls, parent);
+        }
+
+        public MockPluginClassLoader(URL pluginLocation, URL[] urls) {
+            super(pluginLocation, urls);
+        }
+
+        @Override
+        public String location() {
+            return "/tmp/mockpath";
+        }
+    }
+
+    public static class MockConnectorPluginDesc extends PluginDesc<Connector> {
+        public MockConnectorPluginDesc(Class<? extends Connector> klass, String version)
+                throws Exception {
+            super(klass, version, new MockPluginClassLoader(null, new URL[0]));
+        }
+
+        public MockConnectorPluginDesc(Class<? extends Connector> klass) throws Exception
{
+            super(
+                    klass,
+                    klass.newInstance().version(),
+                    new MockPluginClassLoader(null, new URL[0])
+            );
+        }
+    }
+
     public static class TestSinkConnector extends SinkConnector {
 
         static final String VERSION = "some great version";

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index da1edbc..1c3dddb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
@@ -35,6 +34,9 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
@@ -54,6 +56,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
@@ -75,6 +78,7 @@ import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @SuppressWarnings("unchecked")
+@PrepareForTest({StandaloneHerder.class, Plugins.class})
 public class StandaloneHerderTest {
     private static final String CONNECTOR_NAME = "test";
     private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
@@ -90,12 +94,21 @@ public class StandaloneHerderTest {
 
     private Connector connector;
     @Mock protected Worker worker;
+    @Mock private Plugins plugins;
+    @Mock
+    private PluginClassLoader pluginLoader;
+    @Mock
+    private DelegatingClassLoader delegatingLoader;
     @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
     @Mock protected StatusBackingStore statusBackingStore;
 
     @Before
     public void setup() {
         herder = new StandaloneHerder(worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
+        plugins = PowerMock.createMock(Plugins.class);
+        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
+        delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
+        PowerMock.mockStatic(Plugins.class);
     }
 
     @Test
@@ -120,12 +133,12 @@ public class StandaloneHerderTest {
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
@@ -141,11 +154,10 @@ public class StandaloneHerderTest {
     public void testCreateConnectorFailedCustomValidation() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
 
-        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
 
         ConfigDef configDef = new ConfigDef();
         configDef.define("foo.bar", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "foo.bar
doc");
@@ -153,7 +165,9 @@ public class StandaloneHerderTest {
 
         ConfigValue validatedValue = new ConfigValue("foo.bar");
         validatedValue.addErrorMessage("Failed foo.bar validation");
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
 
         createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
@@ -172,8 +186,13 @@ public class StandaloneHerderTest {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        expectConfigValidation(config, config);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        expectConfigValidation(connectorMock, true, config, config);
 
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        // No new connector is created
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
         // Second should fail
         createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
@@ -435,7 +454,8 @@ public class StandaloneHerderTest {
         // Create
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
-        expectConfigValidation(connConfig, newConnConfig);
+        Connector connectorMock = PowerMock.createMock(Connector.class);
+        expectConfigValidation(connectorMock, true, connConfig);
 
         // Should get first config
         connectorConfigCb.onCompletion(null, connConfig);
@@ -457,6 +477,7 @@ public class StandaloneHerderTest {
         putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo));
         EasyMock.expectLastCall();
         // Should get new config
+        expectConfigValidation(connectorMock, false, newConnConfig);
         connectorConfigCb.onCompletion(null, newConnConfig);
         EasyMock.expectLastCall();
 
@@ -501,11 +522,12 @@ public class StandaloneHerderTest {
         );
         ConfigDef configDef = new ConfigDef();
         configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString()))
-            .andReturn(connectorMock);
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
         Callback<Herder.Created<ConnectorInfo>> callback = PowerMock.createMock(Callback.class);
         Capture<BadRequestException> capture = Capture.newInstance();
         callback.onCompletion(
@@ -588,15 +610,27 @@ public class StandaloneHerderTest {
 
 
     private void expectConfigValidation(Map<String, String> ... configs) {
-        // config validation
-        ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class);
-        EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock);
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        expectConfigValidation(connectorMock, true, configs);
+    }
+
+    private void expectConfigValidation(
+            Connector connectorMock,
+            boolean shouldCreateConnector,
+            Map<String, String>... configs
+    ) {
+        // config validation
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
+        EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
+        if (shouldCreateConnector) {
+            EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+            EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+        }
         EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
 
         for (Map<String, String> config : configs)
             EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
+        EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
     }
 
     // We need to use a real class here due to some issue with mocking java.lang.Class

http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index cdd8c23..64ea09e 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -71,6 +71,7 @@ versions += [
   zkclient: "0.10",
   zookeeper: "3.4.10",
   jfreechart: "1.0.0",
+  mavenArtifact: "3.5.0",
 ]
 
 libs += [
@@ -112,5 +113,6 @@ libs += [
   snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
   zkclient: "com.101tec:zkclient:$versions.zkclient",
   zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper",
-  jfreechart: "jfreechart:jfreechart:$versions.jfreechart"
+  jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
+  mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact"
 ]


Mime
View raw message