kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3316: Add REST API for listing connector plugins
Date Fri, 25 Mar 2016 23:47:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 496bd3fd4 -> 206757eeb


KAFKA-3316: Add REST API for listing connector plugins

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1090 from Ishiihara/kafka-3316

(cherry picked from commit 78fa20eb58a948abd9ad4e44acfed606400a47f3)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/206757ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/206757ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/206757ee

Branch: refs/heads/0.10.0
Commit: 206757eeb18589952291ce1a5578c66de3669f4c
Parents: 496bd3f
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Fri Mar 25 16:46:53 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Mar 25 16:47:09 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   | 32 +++++++++++-
 .../rest/entities/ConnectorPluginInfo.java      | 54 ++++++++++++++++++++
 .../resources/ConnectorPluginsResource.java     | 10 ++++
 .../connect/runtime/AbstractHerderTest.java     |  1 -
 .../resources/ConnectorPluginsResourceTest.java | 22 +++++++-
 5 files changed, 116 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 8d83644..a97c4db 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -26,20 +26,29 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.tools.VerifiableSinkConnector;
+import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -69,7 +78,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
     protected final StatusBackingStore statusBackingStore;
     private final String workerId;
 
-    protected Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
+    private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
+    private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS
= Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
+    private static List<ConnectorPluginInfo> validConnectorPlugins;
 
     public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, String workerId)
{
         this.worker = worker;
@@ -189,6 +200,25 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
         return generateResult(connType, resultConfigKeys, configValues, allGroups);
     }
 
+    public static List<ConnectorPluginInfo> connectorPlugins() {
+        if (validConnectorPlugins != null) {
+            return validConnectorPlugins;
+        }
+
+        Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
+        Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
+        connectorClasses.removeAll(SKIPPED_CONNECTORS);
+        List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
+        for (Class<? extends Connector> connectorClass: connectorClasses) {
+            int mod = connectorClass.getModifiers();
+            if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) {
+                connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
+            }
+        }
+        validConnectorPlugins = connectorPlugins;
+        return connectorPlugins;
+    }
+
     // public for testing
     public static ConfigInfos generateResult(String connType, Map<String, ConfigKey>
configKeys, List<ConfigValue> configValues, List<String> groups) {
         int errorCount = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
new file mode 100644
index 0000000..097142e
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/>
Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed
on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See
+ * the License for the specific language governing permissions and limitations under the
License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class ConnectorPluginInfo {
+
+    private String clazz;
+
+    @JsonCreator
+    public ConnectorPluginInfo(@JsonProperty("class") String clazz) {
+        this.clazz = clazz;
+    }
+
+    @JsonProperty("class")
+    public String clazz() {
+        return clazz;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ConnectorPluginInfo that = (ConnectorPluginInfo) o;
+        return Objects.equals(clazz, that.clazz);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clazz);
+    }
+
+    @Override
+    public String toString() {
+        return clazz;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 8439707..9e87d0c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -17,12 +17,16 @@
 
 package org.apache.kafka.connect.runtime.rest.resources;
 
+import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 
+import java.util.List;
 import java.util.Map;
 
 import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -46,4 +50,10 @@ public class ConnectorPluginsResource {
                                        final Map<String, String> connectorConfig) throws
Throwable {
         return herder.validateConfigs(connType, connectorConfig);
     }
+
+    @GET
+    @Path("/")
+    public List<ConnectorPluginInfo> listConnectorPlugins() {
+        return AbstractHerder.connectorPlugins();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 1dc5784..e4084a8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -114,5 +114,4 @@ public class AbstractHerderTest extends EasyMockSupport {
 
         verifyAll();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 625c91f..1049e7e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
 
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
@@ -32,6 +32,11 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.tools.VerifiableSinkConnector;
+import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Before;
@@ -49,8 +54,11 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(RestServer.class)
@@ -64,6 +72,7 @@ public class ConnectorPluginsResourceTest {
     }
 
     private static final ConfigInfos CONFIG_INFOS;
+
     static {
         List<ConfigInfo> configs = new LinkedList<>();
 
@@ -120,6 +129,17 @@ public class ConnectorPluginsResourceTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testListConnectorPlugins() {
+        Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName())));
+        assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
+    }
+
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class ConnectorPluginsResourceTestConnector extends Connector {
 


Mime
View raw message