kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-8945/KAFKA-8947 backport (#7524)
Date Wed, 16 Oct 2019 15:53:20 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 3ab189f  KAFKA-8945/KAFKA-8947 backport (#7524)
3ab189f is described below

commit 3ab189f29fe76b213026dc8fe552964d84edd40b
Author: Chris Egerton <chrise@confluent.io>
AuthorDate: Wed Oct 16 08:49:04 2019 -0700

    KAFKA-8945/KAFKA-8947 backport (#7524)
    
    Fix bug in Connect REST extension API caused by invalid constructor parameter validation,
and update integration test to play nicely with Jenkins
    
    Fix instantiation of TaskState objects by Connect framework.
    
    Author: Chris Egerton <chrise@confluent.io>
    Reviewers: Magesh Nandakumar <mageshn@confluent.io>, Randall Hauch <rhauch@gmail.com>
---
 .../apache/kafka/connect/health/AbstractState.java |  23 ++++-
 .../kafka/connect/health/ConnectorHealth.java      |  29 +++++-
 .../kafka/connect/health/ConnectorState.java       |   9 ++
 .../org/apache/kafka/connect/health/TaskState.java |  22 +++--
 .../runtime/health/ConnectClusterStateImpl.java    |   2 +-
 .../integration/RestExtensionIntegrationTest.java  | 104 +++++++++++++++++++--
 6 files changed, 169 insertions(+), 20 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
index a18c463..f707b3c 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.connect.health;
 
+import java.util.Objects;
+
 /**
  * Provides the current status along with identifier for Connect worker and tasks.
  */
@@ -34,10 +36,10 @@ public abstract class AbstractState {
      * @param traceMessage  any error trace message associated with the connector or the
task; may be null or empty
      */
     public AbstractState(String state, String workerId, String traceMessage) {
-        if (state != null && !state.trim().isEmpty()) {
+        if (state == null || state.trim().isEmpty()) {
             throw new IllegalArgumentException("State must not be null or empty");
         }
-        if (workerId != null && !workerId.trim().isEmpty()) {
+        if (workerId == null || workerId.trim().isEmpty()) {
             throw new IllegalArgumentException("Worker ID must not be null or empty");
         }
         this.state = state;
@@ -71,4 +73,21 @@ public abstract class AbstractState {
     public String traceMessage() {
         return traceMessage;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        AbstractState that = (AbstractState) o;
+        return state.equals(that.state)
+            && Objects.equals(traceMessage, that.traceMessage)
+            && workerId.equals(that.workerId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(state, traceMessage, workerId);
+    }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
index 3a9efd1..12fa6b7 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
@@ -35,7 +35,7 @@ public class ConnectorHealth {
                            ConnectorState connectorState,
                            Map<Integer, TaskState> tasks,
                            ConnectorType type) {
-        if (name != null && !name.trim().isEmpty()) {
+        if (name == null || name.trim().isEmpty()) {
             throw new IllegalArgumentException("Connector name is required");
         }
         Objects.requireNonNull(connectorState, "connectorState can't be null");
@@ -83,4 +83,31 @@ public class ConnectorHealth {
         return type;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        ConnectorHealth that = (ConnectorHealth) o;
+        return name.equals(that.name)
+            && connectorState.equals(that.connectorState)
+            && tasks.equals(that.tasks)
+            && type == that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, connectorState, tasks, type);
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectorHealth{"
+            + "name='" + name + '\''
+            + ", connectorState=" + connectorState
+            + ", tasks=" + tasks
+            + ", type=" + type
+            + '}';
+    }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
index d5571bc..6304426 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
@@ -32,4 +32,13 @@ public class ConnectorState extends AbstractState {
     public ConnectorState(String state, String workerId, String traceMessage) {
         super(state, workerId, traceMessage);
     }
+
+    @Override
+    public String toString() {
+        return "ConnectorState{"
+            + "state='" + state() + '\''
+            + ", traceMessage='" + traceMessage() + '\''
+            + ", workerId='" + workerId() + '\''
+            + '}';
+    }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
index 1c1be15..ae78a5f 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
@@ -50,20 +50,28 @@ public class TaskState extends AbstractState {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
+        if (this == o)
             return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
+        if (o == null || getClass() != o.getClass())
+            return false;
+        if (!super.equals(o))
             return false;
-        }
-
         TaskState taskState = (TaskState) o;
-
         return taskId == taskState.taskId;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(taskId);
+        return Objects.hash(super.hashCode(), taskId);
+    }
+
+    @Override
+    public String toString() {
+        return "TaskState{"
+            + "taskId='" + taskId + '\''
+            + "state='" + state() + '\''
+            + ", traceMessage='" + traceMessage() + '\''
+            + ", workerId='" + workerId() + '\''
+            + '}';
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index e3a4833..4384212 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -82,7 +82,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
         for (ConnectorStateInfo.TaskState state : states) {
             taskStates.put(
                 state.id(),
-                new TaskState(state.id(), state.workerId(), state.state(), state.trace())
+                new TaskState(state.id(), state.state(), state.workerId(), state.trace())
             );
         }
         return taskStates;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index d4cac39..a3e8147 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -16,10 +16,16 @@
  */
 package org.apache.kafka.connect.integration;
 
+import org.apache.kafka.connect.health.ConnectClusterState;
+import org.apache.kafka.connect.health.ConnectorHealth;
+import org.apache.kafka.connect.health.ConnectorState;
+import org.apache.kafka.connect.health.ConnectorType;
+import org.apache.kafka.connect.health.TaskState;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Test;
@@ -28,12 +34,18 @@ import org.junit.experimental.categories.Category;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
 
 /**
  * A simple integration test to ensure that REST extensions are registered correctly.
@@ -41,39 +53,86 @@ import static org.apache.kafka.test.TestUtils.waitForCondition;
 @Category(IntegrationTest.class)
 public class RestExtensionIntegrationTest {
 
-    private static final int NUM_WORKERS = 3;
     private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
+    private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
 
     private EmbeddedConnectCluster connect;
 
     @Test
-    public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException
{
+    public void testRestExtensionApi() throws IOException, InterruptedException {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
-  
+
         // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
-                .name("connect-cluster")
-                .numWorkers(NUM_WORKERS)
-                .numBrokers(1)
-                .workerProps(workerProps)
-                .build();
-  
+            .name("connect-cluster")
+            .numWorkers(1)
+            .numBrokers(1)
+            .workerProps(workerProps)
+            .build();
+
         // start the clusters
         connect.start();
 
+        WorkerHandle worker = connect.workers().stream()
+            .findFirst()
+            .orElseThrow(() -> new AssertionError("At least one worker handle should be
available"));
+
         waitForCondition(
             this::extensionIsRegistered,
             REST_EXTENSION_REGISTRATION_TIMEOUT_MS,
             "REST extension was never registered"
         );
+
+        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("test-conn");
+        try {
+            // setup up props for the connector
+            Map<String, String> connectorProps = new HashMap<>();
+            connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
+            connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(1));
+            connectorProps.put(TOPICS_CONFIG, "test-topic");
+
+            // start a connector
+            connectorHandle.taskHandle(connectorHandle.name() + "-0");
+            StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1);
+            connect.configureConnector(connectorHandle.name(), connectorProps);
+            connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+            String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort());
+            ConnectorHealth expectedHealth = new ConnectorHealth(
+                connectorHandle.name(),
+                new ConnectorState(
+                    "RUNNING",
+                    workerId,
+                    null
+                ),
+                Collections.singletonMap(
+                    0,
+                    new TaskState(0, "RUNNING", workerId, null)
+                ),
+                ConnectorType.SINK
+            );
+
+            connectorProps.put(NAME_CONFIG, connectorHandle.name());
+
+            // Test the REST extension API; specifically, that the connector's health and
configuration
+            // are available to the REST extension we registered and that they contain expected
values
+            waitForCondition(
+                () -> verifyConnectorHealth(connectorHandle.name(), expectedHealth),
+                CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS,
+                "Connector health and/or config was never accessible by the REST extension"
+            );
+        } finally {
+            RuntimeHandles.get().deleteConnector(connectorHandle.name());
+        }
     }
 
     @After
     public void close() {
         // stop all Connect, Kafka and Zk threads.
         connect.stop();
+        IntegrationTestRestExtension.instance = null;
     }
 
     private boolean extensionIsRegistered() {
@@ -85,11 +144,38 @@ public class RestExtensionIntegrationTest {
         }
     }
 
+    private boolean verifyConnectorHealth(
+        String connectorName,
+        ConnectorHealth expectedHealth
+    ) {
+        ConnectClusterState clusterState =
+            IntegrationTestRestExtension.instance.restPluginContext.clusterState();
+        
+        ConnectorHealth actualHealth = clusterState.connectorHealth(connectorName);
+        if (actualHealth.tasksState().isEmpty()) {
+            // Happens if the task has been started but its status has not yet been picked
up from
+            // the status topic by the worker.
+            return false;
+        }
+        assertEquals(expectedHealth, actualHealth);
+
+        return true;
+    }
+
     public static class IntegrationTestRestExtension implements ConnectRestExtension {
+        private static IntegrationTestRestExtension instance;
+
+        public ConnectRestExtensionContext restPluginContext;
 
         @Override
         public void register(ConnectRestExtensionContext restPluginContext) {
+            instance = this;
+            this.restPluginContext = restPluginContext;
+            // Immediately request a list of connectors to confirm that the context and its
fields
+            // has been fully initialized and there is no risk of deadlock
             restPluginContext.clusterState().connectors();
+            // Install a new REST resource that can be used to confirm that the extension
has been
+            // successfully registered
             restPluginContext.configurable().register(new IntegrationTestRestExtensionResource());
         }
     


Mime
View raw message