kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST API (#6791)
Date Tue, 04 Jun 2019 02:43:29 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 42fd2c3  KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST
API (#6791)
42fd2c3 is described below

commit 42fd2c310ed804842eaefed33758ff52bbec0f4b
Author: Hai-Dang Dam <damquanghaidang@gmail.com>
AuthorDate: Mon Jun 3 19:06:00 2019 -0700

    KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST API (#6791)
    
    When Connect forwards a REST request from one worker to another, the Authorization header
was not forwarded. This commit changes the Connect framework to add include the authorization
header when forwarding requests to other workers.
    
    Author: Hai-Dang Dam <damquanghaidang@gmail.com>
    Reviewers: Robert Yokota <rayokota@gmail.com>, Randall Hauch <rhauch@gmail.com>
    
    # Conflicts:
    #	connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
    #	connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
    
    # Conflicts:
    #	connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
---
 .../basic/auth/extension/JaasBasicAuthFilter.java  |  16 +--
 .../auth/extension/JaasBasicAuthFilterTest.java    |  34 +++++-
 .../runtime/distributed/DistributedHerder.java     |  11 +-
 .../kafka/connect/runtime/rest/RestClient.java     |  21 +++-
 .../runtime/rest/resources/ConnectorsResource.java |  49 +++++---
 .../resources/ConnectorPluginsResourceTest.java    |   3 +-
 .../rest/resources/ConnectorsResourceTest.java     | 123 ++++++++++++++-------
 7 files changed, 185 insertions(+), 72 deletions(-)

diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
index 6167434..d5b15c6 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.connect.rest.basic.auth.extension;
 
+import java.util.regex.Pattern;
+import javax.ws.rs.HttpMethod;
 import org.apache.kafka.common.config.ConfigException;
 
 import java.io.IOException;
@@ -35,18 +37,18 @@ import javax.ws.rs.container.ContainerRequestFilter;
 import javax.ws.rs.core.Response;
 
 public class JaasBasicAuthFilter implements ContainerRequestFilter {
-
     private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
     static final String AUTHORIZATION = "Authorization";
-
+    private static final Pattern TASK_REQUEST_PATTERN = Pattern.compile("/?connectors/([^/]+)/tasks/?");
     @Override
     public void filter(ContainerRequestContext requestContext) throws IOException {
-
         try {
-            LoginContext loginContext =
-                new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
-                    requestContext.getHeaderString(AUTHORIZATION)));
-            loginContext.login();
+            if (!(requestContext.getMethod().equals(HttpMethod.POST) && TASK_REQUEST_PATTERN.matcher(requestContext.getUriInfo().getPath()).matches()))
{
+                LoginContext loginContext =
+                    new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
+                        requestContext.getHeaderString(AUTHORIZATION)));
+                loginContext.login();
+            }
         } catch (LoginException | ConfigException e) {
             requestContext.abortWith(
                 Response.status(Response.Status.UNAUTHORIZED)
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
index d61fc06..c81f8f6 100644
--- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.connect.rest.basic.auth.extension;
 
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.UriInfo;
 import org.apache.kafka.common.security.JaasUtils;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -52,6 +55,9 @@ public class JaasBasicAuthFilterTest {
     private String previousJaasConfig;
     private Configuration previousConfiguration;
 
+    @MockStrict
+    private UriInfo uriInfo;
+
     @Before
     public void setup() throws IOException {
         EasyMock.reset(requestContext);
@@ -137,7 +143,34 @@ public class JaasBasicAuthFilterTest {
         jaasBasicAuthFilter.filter(requestContext);
     }
 
+    @Test
+    public void testPostWithoutAppropriateCredential() throws IOException {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+        EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+        EasyMock.expect(uriInfo.getPath()).andReturn("connectors/connName/tasks");
+
+        PowerMock.replayAll();
+        jaasBasicAuthFilter.filter(requestContext);
+        EasyMock.verify(requestContext);
+    }
+
+    @Test
+    public void testPostNotChangingConnectorTask() throws IOException {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+        EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+        EasyMock.expect(uriInfo.getPath()).andReturn("local:randomport/connectors/connName");
+        String authHeader = "Basic" + Base64.getEncoder().encodeToString(("user" + ":" +
"password").getBytes());
+        EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
+            .andReturn(authHeader);
+        requestContext.abortWith(EasyMock.anyObject(Response.class));
+        EasyMock.expectLastCall();
+        PowerMock.replayAll();
+        jaasBasicAuthFilter.filter(requestContext);
+        EasyMock.verify(requestContext);
+    }
+
     private void setMock(String authorization, String username, String password, boolean
exceptionCase) {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.GET);
         String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username
+ ":" + password).getBytes());
         EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
             .andReturn(authHeader);
@@ -152,7 +185,6 @@ public class JaasBasicAuthFilterTest {
         File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
         jaasConfigFile.deleteOnExit();
         previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
-
         List<String> lines;
         lines = new ArrayList<>();
         lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule
required ");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7edc3b2..b229102 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1023,8 +1023,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
                         @Override
                         public void run() {
                             try {
-                                String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/"
+ connName + "/tasks");
-                                RestClient.httpRequest(reconfigUrl, "POST", rawTaskProps,
null, config);
+                                String leaderUrl = leaderUrl();
+                                if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
+                                    cb.onCompletion(new ConnectException("Request to leader
to " +
+                                            "reconfigure connector tasks failed " +
+                                            "because the URL of the leader's REST interface
is empty!"), null);
+                                    return;
+                                }
+                                String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/"
+ connName + "/tasks");
+                                RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps,
null, config);
                                 cb.onCompletion(null, null);
                             } catch (ConnectException e) {
                                 log.error("Request to leader to reconfigure connector tasks
failed", e);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 15e8418..de11f26 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -50,12 +51,13 @@ public class RestClient {
      *
      * @param url             HTTP connection will be established with this url.
      * @param method          HTTP method ("GET", "POST", "PUT", etc.)
+     * @param headers         HTTP headers from REST endpoint
      * @param requestBodyData Object to serialize as JSON and send in the request body.
      * @param responseFormat  Expected format of the response to the HTTP request.
      * @param <T>             The type of the deserialized response to the HTTP request.
      * @return The deserialized response to the HTTP request, or null if no data is expected.
      */
-    public static <T> HttpResponse<T> httpRequest(String url, String method,
Object requestBodyData,
+    public static <T> HttpResponse<T> httpRequest(String url, String method,
HttpHeaders headers, Object requestBodyData,
                                                   TypeReference<T> responseFormat,
WorkerConfig config) {
         HttpClient client;
 
@@ -82,6 +84,8 @@ public class RestClient {
             req.method(method);
             req.accept("application/json");
             req.agent("kafka-connect");
+            addHeadersToRequest(headers, req);
+
             if (serializedBody != null) {
                 req.content(new StringContentProvider(serializedBody, StandardCharsets.UTF_8),
"application/json");
             }
@@ -116,6 +120,21 @@ public class RestClient {
         }
     }
 
+
+    /**
+     * Extract headers from REST call and add to client request
+     * @param headers         Headers from REST endpoint
+     * @param req             The client request to modify
+     */
+    private static void addHeadersToRequest(HttpHeaders headers, Request req) {
+        if (headers != null) {
+            String credentialAuthorization = headers.getHeaderString(HttpHeaders.AUTHORIZATION);
+            if (credentialAuthorization != null) {
+                req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization);
+            }
+        }
+    }
+
     /**
      * Convert response parameters from Jetty format (HttpFields)
      * @param httpFields
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4a04512..26a09ea 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -44,6 +46,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
@@ -79,16 +82,18 @@ public class ConnectorsResource {
 
     @GET
     @Path("/")
-    public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward)
throws Throwable {
+    public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward,
+                                             final @Context HttpHeaders headers) throws Throwable
{
         FutureCallback<Collection<String>> cb = new FutureCallback<>();
         herder.connectors(cb);
-        return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>()
{
+        return completeOrForwardRequest(cb, "/connectors", "GET", headers, null, new TypeReference<Collection<String>>()
{
         }, forward);
     }
 
     @POST
     @Path("/")
     public Response createConnector(final @QueryParam("forward") Boolean forward,
+                                    final @Context HttpHeaders headers,
                                     final CreateConnectorRequest createRequest) throws Throwable
{
         // Trim leading and trailing whitespaces from the connector name, replace null with
empty string
         // if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
@@ -100,7 +105,7 @@ public class ConnectorsResource {
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         herder.putConnectorConfig(name, configs, false, cb);
-        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors",
"POST", createRequest,
+        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors",
"POST", headers, createRequest,
                 new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(),
forward);
 
         URI location = UriBuilder.fromUri("/connectors").path(name).build();
@@ -110,19 +115,21 @@ public class ConnectorsResource {
     @GET
     @Path("/{connector}")
     public ConnectorInfo getConnector(final @PathParam("connector") String connector,
+                                      final @Context HttpHeaders headers,
                                       final @QueryParam("forward") Boolean forward) throws
Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
         herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null,
forward);
     }
 
     @GET
     @Path("/{connector}/config")
     public Map<String, String> getConnectorConfig(final @PathParam("connector") String
connector,
+                                                  final @Context HttpHeaders headers,
                                                   final @QueryParam("forward") Boolean forward)
throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
         herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET",
null, forward);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET",
headers, null, forward);
     }
 
     @GET
@@ -134,6 +141,7 @@ public class ConnectorsResource {
     @PUT
     @Path("/{connector}/config")
     public Response putConnectorConfig(final @PathParam("connector") String connector,
+                                       final @Context HttpHeaders headers,
                                        final @QueryParam("forward") Boolean forward,
                                        final Map<String, String> connectorConfig) throws
Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
@@ -141,7 +149,7 @@ public class ConnectorsResource {
 
         herder.putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/"
+ connector + "/config",
-                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new
CreatedConnectorInfoTranslator(), forward);
+                "PUT", headers, connectorConfig, new TypeReference<ConnectorInfo>()
{ }, new CreatedConnectorInfoTranslator(), forward);
         Response.ResponseBuilder response;
         if (createdInfo.created()) {
             URI location = UriBuilder.fromUri("/connectors").path(connector).build();
@@ -155,15 +163,16 @@ public class ConnectorsResource {
     @POST
     @Path("/{connector}/restart")
     public void restartConnector(final @PathParam("connector") String connector,
+                                 final @Context HttpHeaders headers,
                                  final @QueryParam("forward") Boolean forward) throws Throwable
{
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null,
forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers,
null, forward);
     }
 
     @PUT
     @Path("/{connector}/pause")
-    public Response pauseConnector(@PathParam("connector") String connector) {
+    public Response pauseConnector(@PathParam("connector") String connector, final @Context
HttpHeaders headers) {
         herder.pauseConnector(connector);
         return Response.accepted().build();
     }
@@ -178,26 +187,29 @@ public class ConnectorsResource {
     @GET
     @Path("/{connector}/tasks")
     public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
+                                         final @Context HttpHeaders headers,
                                          final @QueryParam("forward") Boolean forward) throws
Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
         herder.taskConfigs(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET",
null, new TypeReference<List<TaskInfo>>() {
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET",
headers, null, new TypeReference<List<TaskInfo>>() {
         }, forward);
     }
 
     @POST
     @Path("/{connector}/tasks")
     public void putTaskConfigs(final @PathParam("connector") String connector,
+                               final @Context HttpHeaders headers,
                                final @QueryParam("forward") Boolean forward,
                                final List<Map<String, String>> taskConfigs) throws
Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.putTaskConfigs(connector, taskConfigs, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs,
forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers,
taskConfigs, forward);
     }
 
     @GET
     @Path("/{connector}/tasks/{task}/status")
     public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String
connector,
+                                                      final @Context HttpHeaders headers,
                                                       final @PathParam("task") Integer task)
throws Throwable {
         return herder.taskStatus(new ConnectorTaskId(connector, task));
     }
@@ -206,20 +218,22 @@ public class ConnectorsResource {
     @Path("/{connector}/tasks/{task}/restart")
     public void restartTask(final @PathParam("connector") String connector,
                             final @PathParam("task") Integer task,
+                            final @Context HttpHeaders headers,
                             final @QueryParam("forward") Boolean forward) throws Throwable
{
         FutureCallback<Void> cb = new FutureCallback<>();
         ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
         herder.restartTask(taskId, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart",
"POST", null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart",
"POST", headers, null, forward);
     }
 
     @DELETE
     @Path("/{connector}")
     public void destroyConnector(final @PathParam("connector") String connector,
+                                 final @Context HttpHeaders headers,
                                  final @QueryParam("forward") Boolean forward) throws Throwable
{
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         herder.deleteConnectorConfig(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null,
forward);
     }
 
     // Check whether the connector name from the url matches the one (if there is one) provided
in the connectorconfig
@@ -239,6 +253,7 @@ public class ConnectorsResource {
     private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
                                               String path,
                                               String method,
+                                              HttpHeaders headers,
                                               Object body,
                                               TypeReference<U> resultType,
                                               Translator<T, U> translator,
@@ -261,7 +276,7 @@ public class ConnectorsResource {
                             .build()
                             .toString();
                     log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
-                    return translator.translate(RestClient.httpRequest(forwardUrl, method,
body, resultType, config));
+                    return translator.translate(RestClient.httpRequest(forwardUrl, method,
headers, body, resultType, config));
                 } else {
                     // we should find the right target for the query within two hops, so
if
                     // we don't, it probably means that a rebalance has taken place.
@@ -283,14 +298,14 @@ public class ConnectorsResource {
         }
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method, Object body,
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method, HttpHeaders headers, Object body,
                                            TypeReference<T> resultType, Boolean forward)
throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>(),
forward);
+        return completeOrForwardRequest(cb, path, method, headers, body, resultType, new
IdentityTranslator<T>(), forward);
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method,
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method, HttpHeaders headers,
                                            Object body, Boolean forward) throws Throwable
{
-        return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>(),
forward);
+        return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<T>(),
forward);
     }
 
     private interface Translator<T, U> {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index ad360b6..67cae67 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -179,7 +180,7 @@ public class ConnectorPluginsResourceTest {
     @Before
     public void setUp() throws Exception {
         PowerMock.mockStatic(RestClient.class,
-                RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class,
TypeReference.class, WorkerConfig.class));
+                RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class,
Object.class, TypeReference.class, WorkerConfig.class));
 
         plugins = PowerMock.createMock(Plugins.class);
         herder = PowerMock.createMock(AbstractHerder.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd25..ba5a2c3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -75,6 +76,7 @@ public class ConnectorsResourceTest {
     private static final String CONNECTOR_NAME_PADDING_WHITESPACES = "   " + CONNECTOR_NAME
+ "  \n  ";
     private static final Boolean FORWARD = true;
     private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS = new HashMap<>();
+    private static final HttpHeaders NULL_HEADERS = null;
     static {
         CONNECTOR_CONFIG_SPECIAL_CHARS.put("name", CONNECTOR_NAME_SPECIAL_CHARS);
         CONNECTOR_CONFIG_SPECIAL_CHARS.put("sample_config", "test_config");
@@ -125,7 +127,7 @@ public class ConnectorsResourceTest {
     @Before
     public void setUp() throws NoSuchMethodException {
         PowerMock.mockStatic(RestClient.class,
-                RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class,
TypeReference.class, WorkerConfig.class));
+                RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class,
Object.class, TypeReference.class, WorkerConfig.class));
         connectorsResource = new ConnectorsResource(herder, null);
     }
 
@@ -142,7 +144,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
+        Collection<String> connectors = connectorsResource.listConnectors(FORWARD,
NULL_HEADERS);
         // Ordering isn't guaranteed, compare sets
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)),
new HashSet<>(connectors));
 
@@ -156,15 +158,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("GET"),
-                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(TypeReference.class),
EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(200, new HashMap<String,
String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
-        // Ordering isn't guaranteed, compare sets
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)),
new HashSet<>(connectors));
-
+        Collection<String> connectors = connectorsResource.listConnectors(FORWARD,
NULL_HEADERS);
         PowerMock.verifyAll();
     }
 
@@ -177,7 +176,7 @@ public class ConnectorsResourceTest {
         PowerMock.replayAll();
 
         // throws
-        connectorsResource.listConnectors(FORWARD);
+        connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
     }
 
     @Test
@@ -191,7 +190,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
     }
@@ -204,19 +203,57 @@ public class ConnectorsResourceTest {
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()),
EasyMock.eq(false), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(201, new HashMap<String,
String>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
                     ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
 
 
     }
 
+    @Test
+    public void testCreateConnectorWithHeaderAuthorization() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME));
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+        EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn("Basic YWxhZGRpbjpvcGVuc2VzYW1l").times(1);
+        EasyMock.replay(httpHeaders);
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()),
EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+        PowerMock.verifyAll();
+    }
+
+
+
+    @Test
+    public void testCreateConnectorWithoutHeaderAuthorization() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME));
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+        EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn(null).times(1);
+        EasyMock.replay(httpHeaders);
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()),
EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+        PowerMock.verifyAll();
+    }
+
     @Test(expected = AlreadyExistsException.class)
     public void testCreateConnectorExists() throws Throwable {
         CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME));
@@ -227,7 +264,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
     }
@@ -246,7 +283,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -265,7 +302,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -284,7 +321,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -297,7 +334,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -308,12 +345,12 @@ public class ConnectorsResourceTest {
         herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME
+ "?forward=false", "DELETE", null, null, null))
+        EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME
+ "?forward=false", "DELETE", NULL_HEADERS, null, null, null))
                 .andReturn(new RestClient.HttpResponse<>(204, new HashMap<String,
String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -327,7 +364,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -341,7 +378,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, FORWARD);
+        ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, NULL_HEADERS,
FORWARD);
         assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE),
             connInfo);
 
@@ -356,7 +393,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME,
FORWARD);
+        Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME,
NULL_HEADERS, FORWARD);
         assertEquals(CONNECTOR_CONFIG, connConfig);
 
         PowerMock.verifyAll();
@@ -370,7 +407,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
+        connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -384,7 +421,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, CONNECTOR_CONFIG);
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG);
 
         PowerMock.verifyAll();
     }
@@ -400,7 +437,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
+        String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, decoded);
 
@@ -418,7 +455,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
+        String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
 
@@ -435,7 +472,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS,
FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
+        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS,
NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, decoded);
 
@@ -452,7 +489,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1,
FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
+        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1,
NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
 
@@ -463,7 +500,7 @@ public class ConnectorsResourceTest {
     public void testPutConnectorConfigNameMismatch() throws Throwable {
         Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
         connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, connConfig);
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, connConfig);
     }
 
     @Test(expected = BadRequestException.class)
@@ -471,7 +508,7 @@ public class ConnectorsResourceTest {
         Map<String, String> connConfig = new HashMap<>();
         connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
         CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig);
-        connectorsResource.createConnector(FORWARD, request);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, request);
     }
 
     @Test
@@ -482,7 +519,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME,
FORWARD);
+        List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME,
NULL_HEADERS, FORWARD);
         assertEquals(TASK_INFOS, taskInfos);
 
         PowerMock.verifyAll();
@@ -496,7 +533,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
+        connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -509,7 +546,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, TASK_CONFIGS);
 
         PowerMock.verifyAll();
     }
@@ -522,7 +559,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, TASK_CONFIGS);
 
         PowerMock.verifyAll();
     }
@@ -535,7 +572,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -547,12 +584,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String,
String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, null);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, null);
 
         PowerMock.verifyAll();
     }
@@ -565,12 +602,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String,
String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, true);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, true);
 
         PowerMock.verifyAll();
     }
@@ -584,7 +621,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, FORWARD);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -598,12 +635,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String,
String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, null);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null);
 
         PowerMock.verifyAll();
     }
@@ -618,12 +655,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String,
String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, true);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true);
 
         PowerMock.verifyAll();
     }


Mime
View raw message