kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959)
Date Sun, 11 Aug 2019 15:23:35 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 04cf8fb  KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959)
04cf8fb is described below

commit 04cf8fb2c116812abaa1b348d5e30536b4a1061c
Author: Chris Egerton <chrise@confluent.io>
AuthorDate: Sun Aug 11 07:56:05 2019 -0700

    KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959)
    
    Connector validation fails if an alias is used for the converter since the validation
for that is done via `ConfigDef.validateAll(...)`, which in turn invokes `Class.forName(...)`
on the alias. Even though the class is successfully loaded by the DelegatingClassLoader, some
Java implementations will refuse to return a class from `Class.forName(...)` whose name differs
from the argument provided.
    
    This commit alters `ConfigDef.parseType(...)` to first invoke `ClassLoader.loadClass(...)`
on the class using our class loader in order to get a handle on the actual class object to
be loaded, then invoke `Class.forName(...)` with the fully-qualified class name of the to-be-loaded
class and return the result. The invocation of `Class.forName(...)` is necessary in order
to allow static initialization to take place; simply calling `ClassLoader.loadClass(...)`
is insufficient.
    
    Also corrected a unit test that relied upon the old behavior.
    
    Author: Chris Egerton <chrise@confluent.io>
    Reviewers: Robert Yokota <rayokota@gmail.com>, Randall Hauch <rhauch@gmail.com>
---
 .../org/apache/kafka/common/config/ConfigDef.java  | 13 +++++++++---
 .../kafka/common/config/AbstractConfigTest.java    |  2 +-
 .../apache/kafka/common/config/ConfigDefTest.java  | 23 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 362b1a6..fa99931 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -707,9 +707,16 @@ public class ConfigDef {
                 case CLASS:
                     if (value instanceof Class)
                         return value;
-                    else if (value instanceof String)
-                        return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader());
-                    else
+                    else if (value instanceof String) {
+                        ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader();
+                        // Use loadClass here instead of Class.forName because the name we
use here may be an alias
+                        // and not match the name of the class that gets loaded. If that
happens, Class.forName can
+                        // throw an exception.
+                        Class<?> klass = contextOrKafkaClassLoader.loadClass(trimmed);
+                        // Invoke forName here with the true name of the requested class
to cause class
+                        // initialization to take place.
+                        return Class.forName(klass.getName(), true, contextOrKafkaClassLoader);
+                    } else
                         throw new ConfigException(name, value, "Expected a Class instance
or class name.");
                 default:
                     throw new IllegalStateException("Unknown type.");
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index f686475..b5fff6f 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -265,7 +265,7 @@ public class AbstractConfigTest {
             @Override
             protected Class<?> findClass(String name) throws ClassNotFoundException
{
                 if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || name.equals(ClassTestConfig.RESTRICTED_CLASS.getName()))
-                    return null;
+                    throw new ClassNotFoundException();
                 else
                     return ClassTestConfig.class.getClassLoader().loadClass(name);
             }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 974d39f..701e9f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -639,6 +639,29 @@ public class ConfigDefTest {
         assertEquals(NestedClass.class, Class.forName(actual));
     }
 
+    @Test
+    public void testClassWithAlias() {
+        final String alias = "PluginAlias";
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            // Could try to use the Plugins class from Connect here, but this should simulate
enough
+            // of the aliasing logic to suffice for this test.
+            Thread.currentThread().setContextClassLoader(new ClassLoader(originalClassLoader)
{
+                @Override
+                public Class loadClass(String name, boolean resolve) throws ClassNotFoundException
{
+                    if (alias.equals(name)) {
+                        return NestedClass.class;
+                    } else {
+                        return super.loadClass(name, resolve);
+                    }
+                }
+            });
+            ConfigDef.parseType("Test config", alias, Type.CLASS);
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+
     private class NestedClass {
     }
 }


Mime
View raw message