kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.7 updated: Revert "KAFKA-10792: Prevent source task shutdown from blocking herder thread (#9669)"
Date Fri, 04 Dec 2020 18:08:45 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 3c6161a  Revert "KAFKA-10792: Prevent source task shutdown from blocking herder thread
(#9669)"
3c6161a is described below

commit 3c6161ac23f9093c46f347ed68beaa20567e2160
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Fri Dec 4 12:07:46 2020 -0600

    Revert "KAFKA-10792: Prevent source task shutdown from blocking herder thread (#9669)"
    
    This reverts commit d8b60939b6e14b8cd47e92520b9299ce5dfde5e5.
---
 .../kafka/connect/runtime/WorkerSourceTask.java    |  44 +-
 .../connect/integration/BlockingConnectorTest.java | 566 ++++-----------------
 .../connect/runtime/ErrorHandlingTaskTest.java     |   6 +
 .../ErrorHandlingTaskWithTopicCreationTest.java    |   6 +
 4 files changed, 150 insertions(+), 472 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 57f6f98..1febd7f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -102,7 +102,9 @@ class WorkerSourceTask extends WorkerTask {
     private CountDownLatch stopRequestedLatch;
 
     private Map<String, String> taskConfig;
-    private boolean started = false;
+    private boolean finishedStart = false;
+    private boolean startedShutdownBeforeStartCompleted = false;
+    private boolean stopped = false;
 
     public WorkerSourceTask(ConnectorTaskId id,
                             SourceTask task,
@@ -164,12 +166,8 @@ class WorkerSourceTask extends WorkerTask {
 
     @Override
     protected void close() {
-        if (started) {
-            try {
-                task.stop();
-            } catch (Throwable t) {
-                log.warn("Could not stop task", t);
-            }
+        if (!shouldPause()) {
+            tryStop();
         }
         if (producer != null) {
             try {
@@ -208,21 +206,39 @@ class WorkerSourceTask extends WorkerTask {
     public void stop() {
         super.stop();
         stopRequestedLatch.countDown();
+        synchronized (this) {
+            if (finishedStart)
+                tryStop();
+            else
+                startedShutdownBeforeStartCompleted = true;
+        }
+    }
+
+    private synchronized void tryStop() {
+        if (!stopped) {
+            try {
+                task.stop();
+                stopped = true;
+            } catch (Throwable t) {
+                log.warn("Could not stop task", t);
+            }
+        }
     }
 
     @Override
     public void execute() {
         try {
-            // If we try to start the task at all by invoking initialize, then count this
as
-            // "started" and expect a subsequent call to the task's stop() method
-            // to properly clean up any resources allocated by its initialize() or 
-            // start() methods. If the task throws an exception during stop(),
-            // the worst thing that happens is another exception gets logged for an already-
-            // failed task
-            started = true;
             task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState));
             task.start(taskConfig);
             log.info("{} Source task finished initialization and start", this);
+            synchronized (this) {
+                if (startedShutdownBeforeStartCompleted) {
+                    tryStop();
+                    return;
+                }
+                finishedStart = true;
+            }
+
             while (!isStopping()) {
                 if (shouldPause()) {
                     onPause();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index abc9a93..9a8e1fa 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -16,26 +16,17 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
-import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
@@ -45,21 +36,16 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
-import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertThrows;
 
@@ -81,33 +67,6 @@ public class BlockingConnectorTest {
     private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
     private static final long REST_REQUEST_TIMEOUT = Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS
* 2;
 
-    private static final String CONNECTOR_INITIALIZE = "Connector::initialize";
-    private static final String CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS = "Connector::initializeWithTaskConfigs";
-    private static final String CONNECTOR_START = "Connector::start";
-    private static final String CONNECTOR_RECONFIGURE = "Connector::reconfigure";
-    private static final String CONNECTOR_TASK_CLASS = "Connector::taskClass";
-    private static final String CONNECTOR_TASK_CONFIGS = "Connector::taskConfigs";
-    private static final String CONNECTOR_STOP = "Connector::stop";
-    private static final String CONNECTOR_VALIDATE = "Connector::validate";
-    private static final String CONNECTOR_CONFIG = "Connector::config";
-    private static final String CONNECTOR_VERSION = "Connector::version";
-    private static final String TASK_START = "Task::start";
-    private static final String TASK_STOP = "Task::stop";
-    private static final String TASK_VERSION = "Task::version";
-    private static final String SINK_TASK_INITIALIZE = "SinkTask::initialize";
-    private static final String SINK_TASK_PUT = "SinkTask::put";
-    private static final String SINK_TASK_FLUSH = "SinkTask::flush";
-    private static final String SINK_TASK_PRE_COMMIT = "SinkTask::preCommit";
-    private static final String SINK_TASK_OPEN = "SinkTask::open";
-    private static final String SINK_TASK_ON_PARTITIONS_ASSIGNED = "SinkTask::onPartitionsAssigned";
-    private static final String SINK_TASK_CLOSE = "SinkTask::close";
-    private static final String SINK_TASK_ON_PARTITIONS_REVOKED = "SinkTask::onPartitionsRevoked";
-    private static final String SOURCE_TASK_INITIALIZE = "SourceTask::initialize";
-    private static final String SOURCE_TASK_POLL = "SourceTask::poll";
-    private static final String SOURCE_TASK_COMMIT = "SourceTask::commit";
-    private static final String SOURCE_TASK_COMMIT_RECORD = "SourceTask::commitRecord";
-    private static final String SOURCE_TASK_COMMIT_RECORD_WITH_METADATA = "SourceTask::commitRecordWithMetadata";
-
     private EmbeddedConnectCluster connect;
     private ConnectorHandle normalConnectorHandle;
 
@@ -142,18 +101,18 @@ public class BlockingConnectorTest {
         // stop all Connect, Kafka and Zk threads.
         connect.stop();
         ConnectorsResource.resetRequestTimeout();
-        Block.resetBlockLatch();
+        BlockingConnector.resetBlockLatch();
     }
 
     @Test
     public void testBlockInConnectorValidate() throws Exception {
         log.info("Starting test testBlockInConnectorValidate");
-        assertThrows(ConnectRestException.class, () -> createConnectorWithBlock(ValidateBlockingConnector.class,
CONNECTOR_VALIDATE));
+        assertThrows(ConnectRestException.class, () -> createConnectorWithBlock(ValidateBlockingConnector.class));
         // Will NOT assert that connector has failed, since the request should fail before
it's even created
 
         // Connector should already be blocked so this should return immediately, but check
just to
         // make sure that it actually did block
-        Block.waitForBlock();
+        BlockingConnector.waitForBlock();
 
         createNormalConnector();
         verifyNormalConnector();
@@ -162,12 +121,12 @@ public class BlockingConnectorTest {
     @Test
     public void testBlockInConnectorConfig() throws Exception {
         log.info("Starting test testBlockInConnectorConfig");
-        assertThrows(ConnectRestException.class, () -> createConnectorWithBlock(ConfigBlockingConnector.class,
CONNECTOR_CONFIG));
+        assertThrows(ConnectRestException.class, () -> createConnectorWithBlock(ConfigBlockingConnector.class));
         // Will NOT assert that connector has failed, since the request should fail before
it's even created
 
         // Connector should already be blocked so this should return immediately, but check
just to
         // make sure that it actually did block
-        Block.waitForBlock();
+        BlockingConnector.waitForBlock();
 
         createNormalConnector();
         verifyNormalConnector();
@@ -176,8 +135,8 @@ public class BlockingConnectorTest {
     @Test
     public void testBlockInConnectorInitialize() throws Exception {
         log.info("Starting test testBlockInConnectorInitialize");
-        createConnectorWithBlock(InitializeBlockingConnector.class, CONNECTOR_INITIALIZE);
-        Block.waitForBlock();
+        createConnectorWithBlock(InitializeBlockingConnector.class);
+        BlockingConnector.waitForBlock();
 
         createNormalConnector();
         verifyNormalConnector();
@@ -186,8 +145,8 @@ public class BlockingConnectorTest {
     @Test
     public void testBlockInConnectorStart() throws Exception {
         log.info("Starting test testBlockInConnectorStart");
-        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
-        Block.waitForBlock();
+        createConnectorWithBlock(BlockingConnector.START);
+        BlockingConnector.waitForBlock();
 
         createNormalConnector();
         verifyNormalConnector();
@@ -196,54 +155,10 @@ public class BlockingConnectorTest {
     @Test
     public void testBlockInConnectorStop() throws Exception {
         log.info("Starting test testBlockInConnectorStop");
-        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
-        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
-        connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
-        Block.waitForBlock();
-
-        createNormalConnector();
-        verifyNormalConnector();
-    }
-
-    @Test
-    public void testBlockInSourceTaskStart() throws Exception {
-        log.info("Starting test testBlockInSourceTaskStart");
-        createConnectorWithBlock(BlockingSourceConnector.class, TASK_START);
-        Block.waitForBlock();
-
-        createNormalConnector();
-        verifyNormalConnector();
-    }
-
-    @Test
-    public void testBlockInSourceTaskStop() throws Exception {
-        log.info("Starting test testBlockInSourceTaskStop");
-        createConnectorWithBlock(BlockingSourceConnector.class, TASK_STOP);
+        createConnectorWithBlock(BlockingConnector.STOP);
         waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
         connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
-        Block.waitForBlock();
-
-        createNormalConnector();
-        verifyNormalConnector();
-    }
-
-    @Test
-    public void testBlockInSinkTaskStart() throws Exception {
-        log.info("Starting test testBlockInSinkTaskStart");
-        createConnectorWithBlock(BlockingSinkConnector.class, TASK_START);
-        Block.waitForBlock();
-
-        createNormalConnector();
-        verifyNormalConnector();
-    }
-
-    @Test
-    public void testBlockInSinkTaskStop() throws Exception {
-        log.info("Starting test testBlockInSinkTaskStop");
-        createConnectorWithBlock(BlockingSinkConnector.class, TASK_STOP);
-        waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
-        connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
-        Block.waitForBlock();
+        BlockingConnector.waitForBlock();
 
         createNormalConnector();
         verifyNormalConnector();
@@ -252,41 +167,50 @@ public class BlockingConnectorTest {
     @Test
     public void testWorkerRestartWithBlockInConnectorStart() throws Exception {
         log.info("Starting test testWorkerRestartWithBlockInConnectorStart");
-        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
+        createConnectorWithBlock(BlockingConnector.START);
         // First instance of the connector should block on startup
-        Block.waitForBlock();
+        BlockingConnector.waitForBlock();
         createNormalConnector();
         connect.removeWorker();
 
         connect.addWorker();
         // After stopping the only worker and restarting it, a new instance of the blocking
         // connector should be created and we can ensure that it blocks again
-        Block.waitForBlock();
+        BlockingConnector.waitForBlock();
         verifyNormalConnector();
     }
 
     @Test
     public void testWorkerRestartWithBlockInConnectorStop() throws Exception {
         log.info("Starting test testWorkerRestartWithBlockInConnectorStop");
-        createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
+        createConnectorWithBlock(BlockingConnector.STOP);
         waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
         createNormalConnector();
         waitForConnectorStart(NORMAL_CONNECTOR_NAME);
         connect.removeWorker();
-        Block.waitForBlock();
+        BlockingConnector.waitForBlock();
 
         connect.addWorker();
         waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
         verifyNormalConnector();
     }
 
-    private void createConnectorWithBlock(Class<? extends Connector> connectorClass,
String block) {
-        Map<String, String> props = new HashMap<>();
+    private void createConnectorWithBlock(String block) {
+        Map<String, String> props = baseBlockingConnectorProps();
+        props.put(BlockingConnector.BLOCK_CONFIG, block);
+        log.info("Creating connector with block during {}", block);
+        try {
+            connect.configureConnector(BLOCKING_CONNECTOR_NAME, props);
+        } catch (RuntimeException e) {
+            log.info("Failed to create connector", e);
+            throw e;
+        }
+    }
+
+    private void createConnectorWithBlock(Class<? extends BlockingConnector> connectorClass)
{
+        Map<String, String> props = baseBlockingConnectorProps();
         props.put(CONNECTOR_CLASS_CONFIG, connectorClass.getName());
-        props.put(TASKS_MAX_CONFIG, "1");
-        props.put(TOPICS_CONFIG, "t1"); // Required for sink connectors
-        props.put(Block.BLOCK_CONFIG, Objects.requireNonNull(block));
-        log.info("Creating blocking connector of type {} with block in {}", connectorClass.getSimpleName(),
block);
+        log.info("Creating blocking connector of type {}", connectorClass.getSimpleName());
         try {
             connect.configureConnector(BLOCKING_CONNECTOR_NAME, props);
         } catch (RuntimeException e) {
@@ -295,6 +219,13 @@ public class BlockingConnectorTest {
         }
     }
 
+    private Map<String, String> baseBlockingConnectorProps() {
+        Map<String, String> result = new HashMap<>();
+        result.put(CONNECTOR_CLASS_CONFIG, BlockingConnector.class.getName());
+        result.put(TASKS_MAX_CONFIG, "1");
+        return result;
+    }
+
     private void createNormalConnector() {
         connect.kafka().createTopic(TEST_TOPIC, 3);
 
@@ -332,43 +263,63 @@ public class BlockingConnectorTest {
         normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
     }
 
-    private static class Block {
+    public static class BlockingConnector extends SourceConnector {
+
         private static CountDownLatch blockLatch;
 
-        private final String block;
+        private String block;
 
         public static final String BLOCK_CONFIG = "block";
 
-        private static ConfigDef config() {
-            return new ConfigDef()
-                .define(
-                    BLOCK_CONFIG,
-                    ConfigDef.Type.STRING,
-                    "",
-                    ConfigDef.Importance.MEDIUM,
-                    "Where to block indefinitely, e.g., 'Connector::start', 'Connector::initialize',
" 
-                        + "'Connector::taskConfigs', 'Task::version', 'SinkTask::put', 'SourceTask::poll'"
-                );
+        public static final String INITIALIZE = "initialize";
+        public static final String INITIALIZE_WITH_TASK_CONFIGS = "initializeWithTaskConfigs";
+        public static final String START = "start";
+        public static final String RECONFIGURE = "reconfigure";
+        public static final String TASK_CLASS = "taskClass";
+        public static final String TASK_CONFIGS = "taskConfigs";
+        public static final String STOP = "stop";
+        public static final String VALIDATE = "validate";
+        public static final String CONFIG = "config";
+        public static final String VERSION = "version";
+
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(
+                BLOCK_CONFIG,
+                ConfigDef.Type.STRING,
+                "",
+                ConfigDef.Importance.MEDIUM,
+                "Where to block indefinitely, e.g., 'start', 'initialize', 'taskConfigs',
'version'"
+            );
+
+        // No-args constructor required by the framework
+        public BlockingConnector() {
+            this(null);
+        }
+
+        protected BlockingConnector(String block) {
+            this.block = block;
+            synchronized (BlockingConnector.class) {
+                if (blockLatch != null) {
+                    blockLatch.countDown();
+                }
+                blockLatch = new CountDownLatch(1);
+            }
         }
 
         public static void waitForBlock() throws InterruptedException {
-            synchronized (Block.class) {
+            synchronized (BlockingConnector.class) {
                 if (blockLatch == null) {
                     throw new IllegalArgumentException("No connector has been created yet");
                 }
             }
-
+            
             log.debug("Waiting for connector to block");
             blockLatch.await();
             log.debug("Connector should now be blocked");
         }
 
-        // Note that there is only ever at most one global block latch at a time, which makes
tests that
-        // use blocks in multiple places impossible. If necessary, this can be addressed
in the future by
-        // adding support for multiple block latches at a time, possibly identifiable by
a connector/task
-        // ID, the location of the expected block, or both.
         public static void resetBlockLatch() {
-            synchronized (Block.class) {
+            synchronized (BlockingConnector.class) {
                 if (blockLatch != null) {
                     blockLatch.countDown();
                     blockLatch = null;
@@ -376,114 +327,81 @@ public class BlockingConnectorTest {
             }
         }
 
-        public Block(Map<String, String> props) {
-            this(new AbstractConfig(config(), props).getString(BLOCK_CONFIG));
-        }
-
-        public Block(String block) {
-            this.block = block;
-            synchronized (Block.class) {
-                if (blockLatch != null) {
-                    blockLatch.countDown();
-                }
-                blockLatch = new CountDownLatch(1);
-            }
-        }
-
-        public Map<String, String> taskConfig() {
-            return Collections.singletonMap(BLOCK_CONFIG, block);
-        }
-
-        public void maybeBlockOn(String block) {
-            if (block.equals(this.block)) {
-                log.info("Will block on {}", block);
-                blockLatch.countDown();
-                while (true) {
-                    try {
-                        Thread.sleep(Long.MAX_VALUE);
-                    } catch (InterruptedException e) {
-                        // No-op. Just keep blocking.
-                    }
-                }
-            } else {
-                log.debug("Will not block on {}", block);
-            }
-        }
-    }
-
-    // Used to test blocks in Connector (as opposed to Task) methods
-    public static class BlockingConnector extends SourceConnector {
-
-        private Block block;
-
-        // No-args constructor required by the framework
-        public BlockingConnector() {
-            this(null);
-        }
-
-        protected BlockingConnector(String block) {
-            this.block = new Block(block);
-        }
-
         @Override
         public void initialize(ConnectorContext ctx) {
-            block.maybeBlockOn(CONNECTOR_INITIALIZE);
+            maybeBlockOn(INITIALIZE);
             super.initialize(ctx);
         }
 
         @Override
         public void initialize(ConnectorContext ctx, List<Map<String, String>>
taskConfigs) {
-            block.maybeBlockOn(CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS);
+            maybeBlockOn(INITIALIZE_WITH_TASK_CONFIGS);
             super.initialize(ctx, taskConfigs);
         }
 
         @Override
         public void start(Map<String, String> props) {
-            this.block = new Block(props);
-            block.maybeBlockOn(CONNECTOR_START);
+            this.block = new AbstractConfig(CONFIG_DEF, props).getString(BLOCK_CONFIG);
+            maybeBlockOn(START);
         }
 
         @Override
         public void reconfigure(Map<String, String> props) {
-            block.maybeBlockOn(CONNECTOR_RECONFIGURE);
             super.reconfigure(props);
+            maybeBlockOn(RECONFIGURE);
         }
 
         @Override
         public Class<? extends Task> taskClass() {
-            block.maybeBlockOn(CONNECTOR_TASK_CLASS);
+            maybeBlockOn(TASK_CLASS);
             return BlockingTask.class;
         }
 
         @Override
         public List<Map<String, String>> taskConfigs(int maxTasks) {
-            block.maybeBlockOn(CONNECTOR_TASK_CONFIGS);
+            maybeBlockOn(TASK_CONFIGS);
             return Collections.singletonList(Collections.emptyMap());
         }
 
         @Override
         public void stop() {
-            block.maybeBlockOn(CONNECTOR_STOP);
+            maybeBlockOn(STOP);
         }
 
         @Override
         public Config validate(Map<String, String> connectorConfigs) {
-            block.maybeBlockOn(CONNECTOR_VALIDATE);
+            maybeBlockOn(VALIDATE);
             return super.validate(connectorConfigs);
         }
 
         @Override
         public ConfigDef config() {
-            block.maybeBlockOn(CONNECTOR_CONFIG);
-            return Block.config();
+            maybeBlockOn(CONFIG);
+            return CONFIG_DEF;
         }
 
         @Override
         public String version() {
-            block.maybeBlockOn(CONNECTOR_VERSION);
+            maybeBlockOn(VERSION);
             return "0.0.0";
         }
 
+        protected void maybeBlockOn(String block) {
+            if (block.equals(this.block)) {
+                log.info("Will block on {}", block);
+                blockLatch.countDown();
+                while (true) {
+                    try {
+                        Thread.sleep(Long.MAX_VALUE);
+                    } catch (InterruptedException e) {
+                        // No-op. Just keep blocking.
+                    }
+                }
+            } else {
+                log.debug("Will not block on {}", block);
+            }
+        }
+
         public static class BlockingTask extends SourceTask {
             @Override
             public void start(Map<String, String> props) {
@@ -508,287 +426,19 @@ public class BlockingConnectorTest {
     // Some methods are called before Connector::start, so we use this as a workaround
     public static class InitializeBlockingConnector extends BlockingConnector {
         public InitializeBlockingConnector() {
-            super(CONNECTOR_INITIALIZE);
+            super(INITIALIZE);
         }
     }
 
     public static class ConfigBlockingConnector extends BlockingConnector {
         public ConfigBlockingConnector() {
-            super(CONNECTOR_CONFIG);
+            super(CONFIG);
         }
     }
 
     public static class ValidateBlockingConnector extends BlockingConnector {
         public ValidateBlockingConnector() {
-            super(CONNECTOR_VALIDATE);
-        }
-    }
-
-    // Used to test blocks in SourceTask methods
-    public static class BlockingSourceConnector extends SourceConnector {
-
-        private Map<String, String> props;
-        private final Class<? extends BlockingSourceTask> taskClass;
-
-        // No-args constructor required by the framework
-        public BlockingSourceConnector() {
-            this(BlockingSourceTask.class);
-        }
-
-        protected BlockingSourceConnector(Class<? extends BlockingSourceTask> taskClass)
{
-            this.taskClass = taskClass;
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-            this.props = props;
-        }
-
-        @Override
-        public Class<? extends Task> taskClass() {
-            return taskClass;
-        }
-
-        @Override
-        public List<Map<String, String>> taskConfigs(int maxTasks) {
-            return IntStream.range(0, maxTasks)
-                .mapToObj(i -> new HashMap<>(props))
-                .collect(Collectors.toList());
-        }
-
-        @Override
-        public void stop() {
-        }
-
-        @Override
-        public Config validate(Map<String, String> connectorConfigs) {
-            return super.validate(connectorConfigs);
-        }
-
-        @Override
-        public ConfigDef config() {
-            return Block.config();
-        }
-
-        @Override
-        public String version() {
-            return "0.0.0";
-        }
-
-        public static class BlockingSourceTask extends SourceTask {
-            private Block block;
-
-            // No-args constructor required by the framework
-            public BlockingSourceTask() {
-                this(null);
-            }
-
-            protected BlockingSourceTask(String block) {
-                this.block = new Block(block);
-            }
-
-            @Override
-            public void start(Map<String, String> props) {
-                this.block = new Block(props);
-                block.maybeBlockOn(TASK_START);
-            }
-
-            @Override
-            public List<SourceRecord> poll() {
-                block.maybeBlockOn(SOURCE_TASK_POLL);
-                return null;
-            }
-
-            @Override
-            public void stop() {
-                block.maybeBlockOn(TASK_STOP);
-            }
-
-            @Override
-            public String version() {
-                block.maybeBlockOn(TASK_VERSION);
-                return "0.0.0";
-            }
-
-            @Override
-            public void initialize(SourceTaskContext context) {
-                block.maybeBlockOn(SOURCE_TASK_INITIALIZE);
-                super.initialize(context);
-            }
-
-            @Override
-            public void commit() throws InterruptedException {
-                block.maybeBlockOn(SOURCE_TASK_COMMIT);
-                super.commit();
-            }
-
-            @Override
-            @SuppressWarnings("deprecation")
-            public void commitRecord(SourceRecord record) throws InterruptedException {
-                block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD);
-                super.commitRecord(record);
-            }
-
-            @Override
-            public void commitRecord(SourceRecord record, RecordMetadata metadata) throws
InterruptedException {
-                block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
-                super.commitRecord(record, metadata);
-            }
-        }
-    }
-
-    public static class TaskInitializeBlockingSourceConnector extends BlockingSourceConnector
{
-        public TaskInitializeBlockingSourceConnector() {
-            super(InitializeBlockingSourceTask.class);
-        }
-
-        public static class InitializeBlockingSourceTask extends BlockingSourceTask {
-            public InitializeBlockingSourceTask() {
-                super(SOURCE_TASK_INITIALIZE);
-            }
-        }
-    }
-
-    // Used to test blocks in SinkTask methods
-    public static class BlockingSinkConnector extends SinkConnector {
-
-        private Map<String, String> props;
-        private final Class<? extends BlockingSinkTask> taskClass;
-
-        // No-args constructor required by the framework
-        public BlockingSinkConnector() {
-            this(BlockingSinkTask.class);
-        }
-
-        protected BlockingSinkConnector(Class<? extends BlockingSinkTask> taskClass)
{
-            this.taskClass = taskClass;
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-            this.props = props;
-        }
-
-        @Override
-        public Class<? extends Task> taskClass() {
-            return taskClass;
-        }
-
-        @Override
-        public List<Map<String, String>> taskConfigs(int maxTasks) {
-            return IntStream.rangeClosed(0, maxTasks)
-                .mapToObj(i -> new HashMap<>(props))
-                .collect(Collectors.toList());
-        }
-
-        @Override
-        public void stop() {
-        }
-
-        @Override
-        public Config validate(Map<String, String> connectorConfigs) {
-            return super.validate(connectorConfigs);
-        }
-
-        @Override
-        public ConfigDef config() {
-            return Block.config();
-        }
-
-        @Override
-        public String version() {
-            return "0.0.0";
-        }
-
-        public static class BlockingSinkTask extends SinkTask {
-            private Block block;
-
-            // No-args constructor required by the framework
-            public BlockingSinkTask() {
-                this(null);
-            }
-
-            protected BlockingSinkTask(String block) {
-                this.block = new Block(block);
-            }
-
-            @Override
-            public void start(Map<String, String> props) {
-                this.block = new Block(props);
-                block.maybeBlockOn(TASK_START);
-            }
-
-            @Override
-            public void put(Collection<SinkRecord> records) {
-                block.maybeBlockOn(SINK_TASK_PUT);
-            }
-
-            @Override
-            public void stop() {
-                block.maybeBlockOn(TASK_STOP);
-            }
-
-            @Override
-            public String version() {
-                block.maybeBlockOn(TASK_VERSION);
-                return "0.0.0";
-            }
-
-            @Override
-            public void initialize(SinkTaskContext context) {
-                block.maybeBlockOn(SINK_TASK_INITIALIZE);
-                super.initialize(context);
-            }
-
-            @Override
-            public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets)
{
-                block.maybeBlockOn(SINK_TASK_FLUSH);
-                super.flush(currentOffsets);
-            }
-
-            @Override
-            public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition,
OffsetAndMetadata> currentOffsets) {
-                block.maybeBlockOn(SINK_TASK_PRE_COMMIT);
-                return super.preCommit(currentOffsets);
-            }
-
-            @Override
-            public void open(Collection<TopicPartition> partitions) {
-                block.maybeBlockOn(SINK_TASK_OPEN);
-                super.open(partitions);
-            }
-
-            @Override
-            @SuppressWarnings("deprecation")
-            public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
-                block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_ASSIGNED);
-                super.onPartitionsAssigned(partitions);
-            }
-
-            @Override
-            public void close(Collection<TopicPartition> partitions) {
-                block.maybeBlockOn(SINK_TASK_CLOSE);
-                super.close(partitions);
-            }
-
-            @Override
-            @SuppressWarnings("deprecation")
-            public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
-                block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_REVOKED);
-                super.onPartitionsRevoked(partitions);
-            }
-        }
-    }
-
-    public static class TaskInitializeBlockingSinkConnector extends BlockingSinkConnector
{
-        public TaskInitializeBlockingSinkConnector() {
-            super(InitializeBlockingSinkTask.class);
-        }
-
-        public static class InitializeBlockingSinkTask extends BlockingSinkTask {
-            public InitializeBlockingSinkTask() {
-                super(SINK_TASK_INITIALIZE);
-            }
+            super(VALIDATE);
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 70bbfc6..c471b03 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -236,6 +236,9 @@ public class ErrorHandlingTaskTest {
 
         createSourceTask(initialState, retryWithToleranceOperator);
 
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
         expectClose();
 
         reporter.close();
@@ -260,6 +263,9 @@ public class ErrorHandlingTaskTest {
 
         createSourceTask(initialState, retryWithToleranceOperator);
 
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
         expectClose();
 
         // Even though the reporters throw exceptions, they should both still be closed.
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index 9c115ac..aba6445 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -250,6 +250,9 @@ public class ErrorHandlingTaskWithTopicCreationTest {
 
         createSourceTask(initialState, retryWithToleranceOperator);
 
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
         expectClose();
 
         reporter.close();
@@ -274,6 +277,9 @@ public class ErrorHandlingTaskWithTopicCreationTest {
 
         createSourceTask(initialState, retryWithToleranceOperator);
 
+        sourceTask.stop();
+        PowerMock.expectLastCall();
+
         expectClose();
 
         // Even though the reporters throw exceptions, they should both still be closed.


Mime
View raw message