kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5657: Connect REST API should include the connector type when describing a connector (KIP-151)
Date Wed, 20 Sep 2017 21:01:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 239dad1b9 -> 552b17078


KAFKA-5657: Connect REST API should include the connector type when describing a connector
(KIP-151)

Embed the type of connector in ConnectorInfo

Author: tedyu <yuzhihong@gmail.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>,
Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3812 from tedyu/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/552b1707
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/552b1707
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/552b1707

Branch: refs/heads/trunk
Commit: 552b1707871fd92656debe33b03381f597be2578
Parents: 239dad1
Author: tedyu <yuzhihong@gmail.com>
Authored: Wed Sep 20 14:01:43 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Sep 20 14:01:43 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   | 20 ++++++++++++++--
 .../runtime/distributed/DistributedHerder.java  | 15 +++++++++---
 .../runtime/rest/entities/ConnectorInfo.java    | 10 +++++++-
 .../rest/entities/ConnectorStateInfo.java       | 10 +++++++-
 .../runtime/standalone/StandaloneHerder.java    |  8 ++++++-
 .../connect/runtime/AbstractHerderTest.java     |  4 ++++
 .../distributed/DistributedHerderTest.java      | 16 +++++++++----
 .../rest/resources/ConnectorsResourceTest.java  | 19 +++++++++++-----
 .../standalone/StandaloneHerderTest.java        | 24 +++++++++++++++-----
 9 files changed, 101 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 607a724..fbe0ae2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
@@ -188,12 +189,17 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
         return worker.getPlugins();
     }
 
+    /*
+     * Retrieves config map by connector name
+     */
+    protected abstract Map<String, String> config(String connName);
+
     @Override
     public ConnectorStateInfo connectorStatus(String connName) {
         ConnectorStatus connector = statusBackingStore.get(connName);
         if (connector == null)
             throw new NotFoundException("No status found for connector " + connName);
-
+        
         Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
 
         ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(
@@ -207,7 +213,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
 
         Collections.sort(taskStates);
 
-        return new ConnectorStateInfo(connName, connectorState, taskStates);
+        Map<String, String> conf = config(connName);
+        return new ConnectorStateInfo(connName, connectorState, taskStates,
+            conf == null ? ConnectorType.UNKNOWN : connectorTypeForClass(conf.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
     }
 
     @Override
@@ -344,6 +352,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
         }
     }
 
+    /*
+     * Retrieves ConnectorType for the corresponding connector class
+     * @param connClass class of the connector
+     */
+    public ConnectorType connectorTypeForClass(String connClass) {
+        return ConnectorType.from(getConnector(connClass).getClass());
+    }
+
     /**
      * Checks a given {@link ConfigInfos} for validation error messages and adds an exception
      * to the given {@link Callback} if any were found.

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8f7503e..390f8c3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -430,7 +430,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
                         if (!configState.contains(connName)) {
                             callback.onCompletion(new NotFoundException("Connector " + connName
+ " not found"), null);
                         } else {
-                            callback.onCompletion(null, new ConnectorInfo(connName, configState.connectorConfig(connName),
configState.tasks(connName)));
+                            Map<String, String> config = configState.connectorConfig(connName);
+                            callback.onCompletion(null, new ConnectorInfo(connName, config,
+                                configState.tasks(connName),
+                                connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))));
                         }
                         return null;
                     }
@@ -440,6 +443,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     }
 
     @Override
+    protected Map<String, String> config(String connName) {
+        return configState.connectorConfig(connName);
+    }
+
+    @Override
     public void connectorConfig(String connName, final Callback<Map<String, String>>
callback) {
         // Subset of connectorInfo, so piggy back on that implementation
         log.trace("Submitting connector config read request {}", connName);
@@ -488,7 +496,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         if (connector instanceof SinkConnector) {
             ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
             String name = (String) validatedName.value();
-
             if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
                 validatedName.addErrorMessage("Consumer group for sink connector named "
+ name +
                         " conflicts with Connect worker group " + workerGroupId);
@@ -527,7 +534,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
 
                         // Note that we use the updated connector config despite the fact
that we don't have an updated
                         // snapshot yet. The existing task info should still be accurate.
-                        ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName));
+                        Map<String, String> map = configState.connectorConfig(connName);
+                        ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
+                            map == null ? null : connectorTypeForClass(map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
                         callback.onCompletion(null, new Created<>(!exists, info));
                         return null;
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 9179d3b..9a10d74 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -30,14 +30,17 @@ public class ConnectorInfo {
     private final String name;
     private final Map<String, String> config;
     private final List<ConnectorTaskId> tasks;
+    private final ConnectorType type;
 
     @JsonCreator
     public ConnectorInfo(@JsonProperty("name") String name,
                          @JsonProperty("config") Map<String, String> config,
-                         @JsonProperty("tasks") List<ConnectorTaskId> tasks) {
+                         @JsonProperty("tasks") List<ConnectorTaskId> tasks,
+                         @JsonProperty("type") ConnectorType type) {
         this.name = name;
         this.config = config;
         this.tasks = tasks;
+        this.type = type;
     }
 
 
@@ -47,6 +50,11 @@ public class ConnectorInfo {
     }
 
     @JsonProperty
+    public ConnectorType type() {
+        return type;
+    }
+
+    @JsonProperty
     public Map<String, String> config() {
         return config;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index c19e20b..80192ca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -28,14 +28,17 @@ public class ConnectorStateInfo {
     private final String name;
     private final ConnectorState connector;
     private final List<TaskState> tasks;
+    private final ConnectorType type;
 
     @JsonCreator
     public ConnectorStateInfo(@JsonProperty("name") String name,
                               @JsonProperty("connector") ConnectorState connector,
-                              @JsonProperty("tasks") List<TaskState> tasks) {
+                              @JsonProperty("tasks") List<TaskState> tasks,
+                              @JsonProperty("type") ConnectorType type) {
         this.name = name;
         this.connector = connector;
         this.tasks = tasks;
+        this.type = type;
     }
 
     @JsonProperty
@@ -53,6 +56,11 @@ public class ConnectorStateInfo {
         return tasks;
     }
 
+    @JsonProperty
+    public ConnectorType type() {
+        return type;
+    }
+
     public abstract static class AbstractState {
         private final String state;
         private final String trace;

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index d57e75f..5d8beab 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -110,7 +110,13 @@ public class StandaloneHerder extends AbstractHerder {
         if (!configState.contains(connector))
             return null;
         Map<String, String> config = configState.connectorConfig(connector);
-        return new ConnectorInfo(connector, config, configState.tasks(connector));
+        return new ConnectorInfo(connector, config, configState.tasks(connector),
+            connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
+    }
+
+    @Override
+    protected Map<String, String> config(String connName) {
+        return configState.connectorConfig(connName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index c261ab6..ac9c312 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -69,6 +70,7 @@ public class AbstractHerderTest extends EasyMockSupport {
                 .createMock();
 
         EasyMock.expect(herder.generation()).andStubReturn(generation);
+        EasyMock.expect(herder.config(connector)).andReturn(null);
 
         EasyMock.expect(statusStore.get(connector))
                 .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId,
generation));
@@ -76,6 +78,7 @@ public class AbstractHerderTest extends EasyMockSupport {
         EasyMock.expect(statusStore.getAll(connector))
                 .andReturn(Collections.singletonList(
                         new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId,
generation)));
+        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
 
         replayAll();
 
@@ -191,6 +194,7 @@ public class AbstractHerderTest extends EasyMockSupport {
         config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
         config.put("required", "value"); // connector required config
         ConfigInfos result = herder.validateConnectorConfig(config);
+        assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)),
ConnectorType.SOURCE);
 
         // We expect there to be errors due to the missing name and .... Note that these
assertions depend heavily on
         // the config fields for SourceConnectorConfig, but we expect these to change rarely.

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index dcbd88f..40d0e2b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -168,12 +169,13 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
         time = new MockTime();
 
-        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff",
"updateDeletedConnectorStatus"},
+        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff",
"connectorTypeForClass", "updateDeletedConnectorStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore,
configBackingStore, member, MEMBER_URL, time);
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
         plugins = PowerMock.createMock(Plugins.class);
+        EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName())).andReturn(ConnectorType.SOURCE).anyTimes();
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
         PowerMock.mockStatic(Plugins.class);
@@ -330,7 +332,8 @@ public class DistributedHerderTest {
         // CONN2 is new, should succeed
         configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
         PowerMock.expectLastCall();
-        ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList());
+        ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList(),
+            ConnectorType.SOURCE);
         putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
@@ -1207,6 +1210,7 @@ public class DistributedHerderTest {
     @Test
     public void testAccessors() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
 
@@ -1233,7 +1237,8 @@ public class DistributedHerderTest {
         assertTrue(listConnectorsCb.isDone());
         assertEquals(Collections.singleton(CONN1), listConnectorsCb.get());
         assertTrue(connectorInfoCb.isDone());
-        ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0,
TASK1, TASK2));
+        ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0,
TASK1, TASK2),
+            ConnectorType.SOURCE);
         assertEquals(info, connectorInfoCb.get());
         assertTrue(connectorConfigCb.isDone());
         assertEquals(CONN1_CONFIG, connectorConfigCb.get());
@@ -1270,7 +1275,7 @@ public class DistributedHerderTest {
 
         // config validation
         Connector connectorMock = PowerMock.createMock(Connector.class);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(5);
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
         EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
@@ -1320,7 +1325,8 @@ public class DistributedHerderTest {
         herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, putConfigCb);
         herder.tick();
         assertTrue(putConfigCb.isDone());
-        ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0,
TASK1, TASK2));
+        ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0,
TASK1, TASK2),
+            ConnectorType.SOURCE);
         assertEquals(new Herder.Created<>(false, updatedInfo), putConfigCb.get());
 
         // Check config again to validate change

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 0b20271..f1bc826 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.util.Callback;
@@ -149,7 +150,8 @@ public class ConnectorsResourceTest {
 
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()),
EasyMock.eq(false), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
 
@@ -167,7 +169,8 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
         EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
-                .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String,
List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+                .andReturn(new RestServer.HttpResponse<>(201, new HashMap<String,
List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
+                    ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
 
@@ -201,7 +204,8 @@ public class ConnectorsResourceTest {
 
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()),
EasyMock.eq(false), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
+            ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
 
@@ -257,12 +261,14 @@ public class ConnectorsResourceTest {
     public void testGetConnector() throws Throwable {
         final Capture<Callback<ConnectorInfo>> cb = Capture.newInstance();
         herder.connectorInfo(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES));
+        expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
+            ConnectorType.SOURCE));
 
         PowerMock.replayAll();
 
         ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, FORWARD);
-        assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES),
connInfo);
+        assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE),
+            connInfo);
 
         PowerMock.verifyAll();
     }
@@ -298,7 +304,8 @@ public class ConnectorsResourceTest {
     public void testPutConnectorConfig() throws Throwable {
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(CONNECTOR_CONFIG),
EasyMock.eq(true), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new Herder.Created<>(false, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
+        expectAndCallbackResult(cb, new Herder.Created<>(false, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
+            ConnectorType.SINK)));
 
         PowerMock.replayAll();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/552b1707/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index c58d702..1cd1804 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -34,10 +34,12 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConnector;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -78,7 +80,7 @@ import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @SuppressWarnings("unchecked")
-@PrepareForTest({StandaloneHerder.class, Plugins.class})
+@PrepareForTest({StandaloneHerder.class, Plugins.class, WorkerConnector.class})
 public class StandaloneHerderTest {
     private static final String CONNECTOR_NAME = "test";
     private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
@@ -104,11 +106,14 @@ public class StandaloneHerderTest {
 
     @Before
     public void setup() {
-        herder = new StandaloneHerder(worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
+        worker = PowerMock.createMock(Worker.class);
+        herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"},
+            worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
         plugins = PowerMock.createMock(Plugins.class);
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
         PowerMock.mockStatic(Plugins.class);
+        PowerMock.mockStatic(WorkerConnector.class);
     }
 
     @Test
@@ -217,7 +222,6 @@ public class StandaloneHerderTest {
 
         Map<String, String> config = connectorConfig(SourceSink.SINK);
         expectConfigValidation(config);
-
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
@@ -419,7 +423,8 @@ public class StandaloneHerderTest {
         // Validate accessors with 1 connector
         listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
         EasyMock.expectLastCall();
-        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new
ConnectorTaskId(CONNECTOR_NAME, 0)));
+        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new
ConnectorTaskId(CONNECTOR_NAME, 0)),
+            ConnectorType.SOURCE);
         connectorInfoCb.onCompletion(null, connInfo);
         EasyMock.expectLastCall();
         connectorConfigCb.onCompletion(null, connConfig);
@@ -478,13 +483,15 @@ public class StandaloneHerderTest {
                 .andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
         worker.isSinkConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(false);
-        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new
ConnectorTaskId(CONNECTOR_NAME, 0)));
+        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new
ConnectorTaskId(CONNECTOR_NAME, 0)),
+            ConnectorType.SOURCE);
         putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo));
         EasyMock.expectLastCall();
         // Should get new config
         expectConfigValidation(connectorMock, false, newConnConfig);
         connectorConfigCb.onCompletion(null, newConnConfig);
         EasyMock.expectLastCall();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
 
         PowerMock.replayAll();
 
@@ -561,7 +568,8 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
 
-        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new
ConnectorTaskId(CONNECTOR_NAME, 0)));
+        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new
ConnectorTaskId(CONNECTOR_NAME, 0)),
+            SourceSink.SOURCE == sourceSink ? ConnectorType.SOURCE : ConnectorType.SINK);
         createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
         EasyMock.expectLastCall();
 
@@ -575,6 +583,10 @@ public class StandaloneHerderTest {
         worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink),
generatedTaskProps, herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
+        EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
+            .andReturn(ConnectorType.SOURCE).anyTimes();
+        EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName()))
+        .andReturn(ConnectorType.SINK).anyTimes();
         worker.isSinkConnector(CONNECTOR_NAME);
         PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
     }


Mime
View raw message