kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Traverse plugin path recursively in Connect (KIP-146)
Date Thu, 01 Jun 2017 09:08:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 049abe7ef -> e0150a25e


MINOR: Traverse plugin path recursively in Connect (KIP-146)

Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3173 from kkonstantine/MINOR-Traverse-plugin-path-recursively-in-Connect


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e0150a25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e0150a25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e0150a25

Branch: refs/heads/trunk
Commit: e0150a25e8127c282f54a0395eb2b1c80ebda94a
Parents: 049abe7
Author: Konstantine Karantasis <konstantine@confluent.io>
Authored: Thu Jun 1 02:07:53 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Jun 1 02:07:53 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/ConnectorFactory.java |  99 ----------
 .../isolation/DelegatingClassLoader.java        |  76 ++++----
 .../runtime/isolation/PluginClassLoader.java    |  17 +-
 .../connect/runtime/isolation/PluginUtils.java  | 188 ++++++++++++++++---
 .../runtime/isolation/PluginUtilsTest.java      |  10 +-
 5 files changed, 219 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
deleted file mode 100644
index fd0d982..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.connector.Connector;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.reflections.Reflections;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-public class ConnectorFactory {
-
-    public Connector newConnector(String connectorClassOrAlias) {
-        return instantiate(getConnectorClass(connectorClassOrAlias));
-    }
-
-    public Task newTask(Class<? extends Task> taskClass) {
-        return instantiate(taskClass);
-    }
-
-    private static <T> T instantiate(Class<? extends T> cls) {
-        try {
-            return Utils.newInstance(cls);
-        } catch (Throwable t) {
-            throw new ConnectException("Instantiation error", t);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private static Class<? extends Connector> getConnectorClass(String connectorClassOrAlias)
{
-        // Avoid the classpath scan if the full class name was provided
-        try {
-            Class<?> clazz = Class.forName(connectorClassOrAlias);
-            if (!Connector.class.isAssignableFrom(clazz))
-                throw new ConnectException("Class " + connectorClassOrAlias + " does not
implement Connector");
-            return (Class<? extends Connector>) clazz;
-        } catch (ClassNotFoundException e) {
-            // Fall through to scan for the alias
-        }
-
-        // Iterate over our entire classpath to find all the connectors and hopefully one
of them matches the alias from the connector configration
-        Reflections reflections = new Reflections(new ConfigurationBuilder()
-                .setUrls(ClasspathHelper.forJavaClassPath()));
-
-        Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
-
-        List<Class<? extends Connector>> results = new ArrayList<>();
-
-        for (Class<? extends Connector> connector: connectors) {
-            // Configuration included the class name but not package
-            if (connector.getSimpleName().equals(connectorClassOrAlias))
-                results.add(connector);
-
-            // Configuration included a short version of the name (i.e. FileStreamSink instead
of FileStreamSinkConnector)
-            if (connector.getSimpleName().equals(connectorClassOrAlias + "Connector"))
-                results.add(connector);
-        }
-
-        if (results.isEmpty())
-            throw new ConnectException("Failed to find any class that implements Connector
and which name matches " + connectorClassOrAlias +
-                    ", available connectors are: " + connectorNames(connectors));
-        if (results.size() > 1) {
-            throw new ConnectException("More than one connector matches alias " +  connectorClassOrAlias
+
-                    ". Please use full package and class name instead. Classes found: " +
connectorNames(results));
-        }
-
-        // We just validated that we have exactly one result, so this is safe
-        return results.get(0);
-    }
-
-    private static String connectorNames(Collection<Class<? extends Connector>>
connectors) {
-        StringBuilder names = new StringBuilder();
-        for (Class<?> c : connectors)
-            names.append(c.getName()).append(", ");
-        return names.substring(0, names.toString().length() - 2);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index da8b444..ac0530e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -51,6 +51,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
 
     private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
+    private final Map<String, String> aliases;
     private final SortedSet<PluginDesc<Connector>> connectors;
     private final SortedSet<PluginDesc<Converter>> converters;
     private final SortedSet<PluginDesc<Transformation>> transformations;
@@ -61,6 +62,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         super(new URL[0], parent);
         this.pluginPaths = pluginPaths;
         this.pluginLoaders = new HashMap<>();
+        this.aliases = new HashMap<>();
         this.activePaths = new HashMap<>();
         this.connectors = new TreeSet<>();
         this.converters = new TreeSet<>();
@@ -89,8 +91,10 @@ public class DelegatingClassLoader extends URLClassLoader {
 
     public ClassLoader connectorLoader(String connectorClassOrAlias) {
         log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias);
-        SortedMap<PluginDesc<?>, ClassLoader> inner =
-                pluginLoaders.get(connectorClassOrAlias);
+        String fullName = aliases.containsKey(connectorClassOrAlias)
+                          ? aliases.get(connectorClassOrAlias)
+                          : connectorClassOrAlias;
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
         if (inner == null) {
             log.error(
                     "Plugin class loader for connector: '{}' was not found. Returning: {}",
@@ -137,23 +141,16 @@ public class DelegatingClassLoader extends URLClassLoader {
             for (String configPath : pluginPaths) {
                 path = configPath;
                 Path pluginPath = Paths.get(path).toAbsolutePath();
+                // Update for exception handling
+                path = pluginPath.toString();
                 // Currently 'plugin.paths' property is a list of top-level directories
                 // containing plugins
                 if (Files.isDirectory(pluginPath)) {
                     for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
-                        log.info("Loading plugin from: {}", pluginLocation);
-                        URL[] urls = PluginUtils.pluginUrls(pluginLocation).toArray(new URL[0]);
-                        if (log.isDebugEnabled()) {
-                            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
-                        }
-                        PluginClassLoader loader = newPluginClassLoader(
-                                pluginLocation.toUri().toURL(),
-                                urls,
-                                this
-                        );
-
-                        scanUrlsAndAddPlugins(loader, urls, pluginLocation);
+                        registerPlugin(pluginLocation);
                     }
+                } else if (PluginUtils.isArchive(pluginPath)) {
+                    registerPlugin(pluginPath);
                 }
             }
 
@@ -165,15 +162,34 @@ public class DelegatingClassLoader extends URLClassLoader {
                     null
             );
         } catch (InvalidPathException | MalformedURLException e) {
-            log.error("Invalid path in plugin path: {}. Ignoring.", path);
+            log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
         } catch (IOException e) {
-            log.error("Could not get listing for plugin path: {}. Ignoring.", path);
+            log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
         } catch (InstantiationException | IllegalAccessException e) {
             log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e);
         }
         addAllAliases();
     }
 
+    private void registerPlugin(Path pluginLocation)
+            throws InstantiationException, IllegalAccessException, IOException {
+        log.info("Loading plugin from: {}", pluginLocation);
+        List<URL> pluginUrls = new ArrayList<>();
+        for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
+            pluginUrls.add(path.toUri().toURL());
+        }
+        URL[] urls = pluginUrls.toArray(new URL[0]);
+        if (log.isDebugEnabled()) {
+            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
+        }
+        PluginClassLoader loader = newPluginClassLoader(
+                pluginLocation.toUri().toURL(),
+                urls,
+                this
+        );
+        scanUrlsAndAddPlugins(loader, urls, pluginLocation);
+    }
+
     private void scanUrlsAndAddPlugins(
             ClassLoader loader,
             URL[] urls,
@@ -245,28 +261,17 @@ public class DelegatingClassLoader extends URLClassLoader {
             return super.loadClass(name, resolve);
         }
 
-        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+        String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
         if (inner != null) {
-            log.trace("Retrieving loaded class '{}' from '{}'", name, inner.get(inner.lastKey()));
             ClassLoader pluginLoader = inner.get(inner.lastKey());
+            log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
             return pluginLoader instanceof PluginClassLoader
-                   ? ((PluginClassLoader) pluginLoader).loadClass(name, resolve)
-                   : super.loadClass(name, resolve);
+                   ? ((PluginClassLoader) pluginLoader).loadClass(fullName, resolve)
+                   : super.loadClass(fullName, resolve);
         }
 
-        Class<?> klass = null;
-        for (PluginClassLoader loader : activePaths.values()) {
-            try {
-                klass = loader.loadClass(name, resolve);
-                break;
-            } catch (ClassNotFoundException e) {
-                // Not found in this loader.
-            }
-        }
-        if (klass == null) {
-            return super.loadClass(name, resolve);
-        }
-        return klass;
+        return super.loadClass(fullName, resolve);
     }
 
     private void addAllAliases() {
@@ -280,12 +285,11 @@ public class DelegatingClassLoader extends URLClassLoader {
             if (PluginUtils.isAliasUnique(plugin, plugins)) {
                 String simple = PluginUtils.simpleName(plugin);
                 String pruned = PluginUtils.prunedName(plugin);
-                SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(plugin.className());
-                pluginLoaders.put(simple, inner);
+                aliases.put(simple, plugin.className());
                 if (simple.equals(pruned)) {
                     log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
                 } else {
-                    pluginLoaders.put(pruned, inner);
+                    aliases.put(pruned, plugin.className());
                     log.info(
                             "Added aliases '{}' and '{}' to plugin '{}'",
                             simple,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
index 07438e9..780ebd0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
@@ -23,7 +23,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 
 public class PluginClassLoader extends URLClassLoader {
-    private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+    private static final Logger log = LoggerFactory.getLogger(PluginClassLoader.class);
     private final URL pluginLocation;
 
     public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
@@ -49,16 +49,17 @@ public class PluginClassLoader extends URLClassLoader {
     protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException
{
         Class<?> klass = findLoadedClass(name);
         if (klass == null) {
-            if (PluginUtils.shouldLoadInIsolation(name)) {
-                try {
+            try {
+                if (PluginUtils.shouldLoadInIsolation(name)) {
                     klass = findClass(name);
-                } catch (ClassNotFoundException e) {
-                    // Not found in loader's path. Search in parents.
                 }
+            } catch (ClassNotFoundException e) {
+                // Not found in loader's path. Search in parents.
+                log.trace("Class '{}' not found. Delegating to parent", name);
             }
-            if (klass == null) {
-                klass = super.loadClass(name, false);
-            }
+        }
+        if (klass == null) {
+            klass = super.loadClass(name, false);
         }
         if (resolve) {
             resolveClass(klass);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index b2be997..edc1636 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -16,26 +16,107 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.lang.reflect.Modifier;
-import java.net.URL;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 
 public class PluginUtils {
+    private static final Logger log = LoggerFactory.getLogger(PluginUtils.class);
+
+    // Be specific about javax packages and exclude those existing in Java SE and Java EE
libraries.
     private static final String BLACKLIST = "^(?:"
             + "java"
-            + "|javax"
-            + "|org\\.omg"
+            + "|javax\\.accessibility"
+            + "|javax\\.activation"
+            + "|javax\\.activity"
+            + "|javax\\.annotation"
+            + "|javax\\.batch\\.api"
+            + "|javax\\.batch\\.operations"
+            + "|javax\\.batch\\.runtime"
+            + "|javax\\.crypto"
+            + "|javax\\.decorator"
+            + "|javax\\.ejb"
+            + "|javax\\.el"
+            + "|javax\\.enterprise\\.concurrent"
+            + "|javax\\.enterprise\\.context"
+            + "|javax\\.enterprise\\.context\\.spi"
+            + "|javax\\.enterprise\\.deploy\\.model"
+            + "|javax\\.enterprise\\.deploy\\.shared"
+            + "|javax\\.enterprise\\.deploy\\.spi"
+            + "|javax\\.enterprise\\.event"
+            + "|javax\\.enterprise\\.inject"
+            + "|javax\\.enterprise\\.inject\\.spi"
+            + "|javax\\.enterprise\\.util"
+            + "|javax\\.faces"
+            + "|javax\\.imageio"
+            + "|javax\\.inject"
+            + "|javax\\.interceptor"
+            + "|javax\\.jms"
+            + "|javax\\.json"
+            + "|javax\\.jws"
+            + "|javax\\.lang\\.model"
+            + "|javax\\.mail"
+            + "|javax\\.management"
+            + "|javax\\.management\\.j2ee"
+            + "|javax\\.naming"
+            + "|javax\\.net"
+            + "|javax\\.persistence"
+            + "|javax\\.print"
+            + "|javax\\.resource"
+            + "|javax\\.rmi"
+            + "|javax\\.script"
+            + "|javax\\.security\\.auth"
+            + "|javax\\.security\\.auth\\.message"
+            + "|javax\\.security\\.cert"
+            + "|javax\\.security\\.jacc"
+            + "|javax\\.security\\.sasl"
+            + "|javax\\.servlet"
+            + "|javax\\.sound\\.midi"
+            + "|javax\\.sound\\.sampled"
+            + "|javax\\.sql"
+            + "|javax\\.swing"
+            + "|javax\\.tools"
+            + "|javax\\.transaction"
+            + "|javax\\.validation"
+            + "|javax\\.websocket"
+            + "|javax\\.ws\\.rs"
+            + "|javax\\.xml"
+            + "|javax\\.xml\\.bind"
+            + "|javax\\.xml\\.registry"
+            + "|javax\\.xml\\.rpc"
+            + "|javax\\.xml\\.soap"
+            + "|javax\\.xml\\.ws"
+            + "|org\\.ietf\\.jgss"
+            + "|org\\.omg\\.CORBA"
+            + "|org\\.omg\\.CosNaming"
+            + "|org\\.omg\\.Dynamic"
+            + "|org\\.omg\\.DynamicAny"
+            + "|org\\.omg\\.IOP"
+            + "|org\\.omg\\.Messaging"
+            + "|org\\.omg\\.PortableInterceptor"
+            + "|org\\.omg\\.PortableServer"
+            + "|org\\.omg\\.SendingContext"
+            + "|org\\.omg\\.stub\\.java\\.rmi"
             + "|org\\.w3c\\.dom"
+            + "|org\\.xml\\.sax"
             + "|org\\.apache\\.kafka\\.common"
             + "|org\\.apache\\.kafka\\.connect"
-            + "|org\\.apache\\.log4j"
+            + "|org\\.slf4j"
             + ")\\..*$";
 
     private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:"
@@ -50,7 +131,7 @@ public class PluginUtils {
             .Filter<Path>() {
         @Override
         public boolean accept(Path path) throws IOException {
-            return Files.isDirectory(path) || PluginUtils.isJar(path);
+            return Files.isDirectory(path) || isArchive(path) || isClassFile(path);
         }
     };
 
@@ -63,32 +144,16 @@ public class PluginUtils {
         return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
     }
 
-    public static boolean isJar(Path path) {
+    public static boolean isArchive(Path path) {
         return path.toString().toLowerCase(Locale.ROOT).endsWith(".jar");
     }
 
-    public static List<URL> pluginUrls(Path pluginPath) throws IOException {
-        List<URL> urls = new ArrayList<>();
-        if (PluginUtils.isJar(pluginPath)) {
-            urls.add(pluginPath.toUri().toURL());
-        } else if (Files.isDirectory(pluginPath)) {
-            try (
-                    DirectoryStream<Path> listing = Files.newDirectoryStream(
-                            pluginPath,
-                            PLUGIN_PATH_FILTER
-                    )
-            ) {
-                for (Path jar : listing) {
-                    urls.add(jar.toUri().toURL());
-                }
-            }
-        }
-        return urls;
+    public static boolean isClassFile(Path path) {
+        return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
     }
 
     public static List<Path> pluginLocations(Path topPath) throws IOException {
         List<Path> locations = new ArrayList<>();
-        // Non-recursive for now. Plugin directories or jars need to be exactly under the
topPath.
         try (
                 DirectoryStream<Path> listing = Files.newDirectoryStream(
                         topPath,
@@ -102,6 +167,71 @@ public class PluginUtils {
         return locations;
     }
 
+    public static List<Path> pluginUrls(Path topPath) throws IOException {
+        boolean containsClassFiles = false;
+        Set<Path> archives = new HashSet<>();
+        LinkedList<DirectoryEntry> dfs = new LinkedList<>();
+        Set<Path> visited = new HashSet<>();
+
+        if (isArchive(topPath)) {
+            return Collections.singletonList(topPath);
+        }
+
+        DirectoryStream<Path> topListing = Files.newDirectoryStream(
+                topPath,
+                PLUGIN_PATH_FILTER
+        );
+        dfs.push(new DirectoryEntry(topListing));
+        visited.add(topPath);
+        try {
+            while (!dfs.isEmpty()) {
+                Iterator<Path> neighbors = dfs.peek().iterator;
+                if (!neighbors.hasNext()) {
+                    dfs.pop().stream.close();
+                    continue;
+                }
+
+                Path adjacent = neighbors.next();
+                if (Files.isSymbolicLink(adjacent)) {
+                    Path absolute = Files.readSymbolicLink(adjacent).toRealPath();
+                    if (Files.exists(absolute)) {
+                        adjacent = absolute;
+                    } else {
+                        continue;
+                    }
+                }
+
+                if (!visited.contains(adjacent)) {
+                    visited.add(adjacent);
+                    if (isArchive(adjacent)) {
+                        archives.add(adjacent);
+                    } else if (isClassFile(adjacent)) {
+                        containsClassFiles = true;
+                    } else {
+                        DirectoryStream<Path> listing = Files.newDirectoryStream(
+                                adjacent,
+                                PLUGIN_PATH_FILTER
+                        );
+                        dfs.push(new DirectoryEntry(listing));
+                    }
+                }
+            }
+        } finally {
+            while (!dfs.isEmpty()) {
+                dfs.pop().stream.close();
+            }
+        }
+
+        if (containsClassFiles) {
+            if (archives.isEmpty()) {
+                return Collections.singletonList(topPath);
+            }
+            log.warn("Plugin path contains both java archives and class files. Returning
only the"
+                    + " archives");
+        }
+        return Arrays.asList(archives.toArray(new Path[0]));
+    }
+
     public static String simpleName(PluginDesc<?> plugin) {
         return plugin.pluginClass().getSimpleName();
     }
@@ -144,4 +274,14 @@ public class PluginUtils {
         return simple;
     }
 
+    private static class DirectoryEntry {
+        DirectoryStream<Path> stream;
+        Iterator<Path> iterator;
+
+        DirectoryEntry(DirectoryStream<Path> stream) {
+            this.stream = stream;
+            this.iterator = stream.iterator();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index c943863..a49e54c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -39,20 +39,22 @@ public class PluginUtilsTest {
         assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.String"));
         assertFalse(PluginUtils.shouldLoadInIsolation("java.util.HashMap$Entry"));
         assertFalse(PluginUtils.shouldLoadInIsolation("java.io.Serializable"));
-        assertFalse(PluginUtils.shouldLoadInIsolation("javax."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("javax.rmi."));
         assertFalse(PluginUtils.shouldLoadInIsolation(
                 "javax.management.loading.ClassLoaderRepository")
         );
-        assertFalse(PluginUtils.shouldLoadInIsolation("org.omg."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA."));
         assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA.Object"));
         assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom."));
         assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom.traversal.TreeWalker"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.xml.sax."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.xml.sax.EntityResolver"));
     }
 
     @Test
     public void testThirdPartyClasses() throws Exception {
-        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j."));
-        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j.Level"));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.slf4j."));
+        assertFalse(PluginUtils.shouldLoadInIsolation("org.slf4j.LoggerFactory"));
     }
 
     @Test


Mime
View raw message