kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-3816: Add MDC logging to Connect runtime (#5743)
Date Thu, 16 May 2019 11:35:42 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b395ef4  KAFKA-3816: Add MDC logging to Connect runtime (#5743)
b395ef4 is described below

commit b395ef418237a37b95d6452f0111998f63c047db
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Thu May 16 06:35:01 2019 -0500

    KAFKA-3816: Add MDC logging to Connect runtime (#5743)
    
    See https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs
    
    Added LoggingContext as a simple mechanism to set and unset Mapped Diagnostic Contexts (MDC) in the loggers to provide for each thread useful parameters that can be used within the logging configuration. MDC avoids having to modify lots of log statements, since the parameters are available to all log statements issued by the thread, no matter what class makes those calls.
    
    The design intentionally minimizes the number of changes to any existing classes, and does not use Java 8 features so it can be easily backported if desired, although per this KIP it will be applied initially only in AK 2.3 and later and must be enabled via the Log4J configuration.
    
    Reviewers: Jason Gustafson <jason@conflent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 config/connect-log4j.properties                    |   7 +
 .../connect/runtime/SourceTaskOffsetCommitter.java |   7 +-
 .../org/apache/kafka/connect/runtime/Worker.java   | 337 +++++++++++----------
 .../apache/kafka/connect/runtime/WorkerTask.java   |  38 ++-
 .../apache/kafka/connect/util/LoggingContext.java  | 227 ++++++++++++++
 .../runtime/SourceTaskOffsetCommitterTest.java     |   6 +
 .../kafka/connect/util/LoggingContextTest.java     | 207 +++++++++++++
 .../runtime/src/test/resources/log4j.properties    |  10 +-
 8 files changed, 655 insertions(+), 184 deletions(-)

diff --git a/config/connect-log4j.properties b/config/connect-log4j.properties
index 808addb..4e1e196 100644
--- a/config/connect-log4j.properties
+++ b/config/connect-log4j.properties
@@ -18,6 +18,13 @@ log4j.rootLogger=INFO, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+#
+# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
+# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
+# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
+#
+#log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
 log4j.logger.org.apache.zookeeper=ERROR
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index c502809..8e8d3fa 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +80,9 @@ class SourceTaskOffsetCommitter {
         ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
-                commit(workerTask);
+                try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) {
+                    commit(workerTask);
+                }
             }
         }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
         committers.put(id, commitFuture);
@@ -90,7 +93,7 @@ class SourceTaskOffsetCommitter {
         if (task == null)
             return;
 
-        try {
+        try (LoggingContext loggingContext = LoggingContext.forTask(id)) {
             task.cancel(false);
             if (!task.isDone())
                 task.get();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e46feed..f6f7452 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -52,6 +52,7 @@ import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
 import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -225,38 +226,40 @@ public class Worker {
             ConnectorStatus.Listener statusListener,
             TargetState initialState
     ) {
-        if (connectors.containsKey(connName))
-            throw new ConnectException("Connector with name " + connName + " already exists");
-
-        final WorkerConnector workerConnector;
-        ClassLoader savedLoader = plugins.currentThreadLoader();
-        try {
-            final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
-            final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-            log.info("Creating connector {} of type {}", connName, connClass);
-            final Connector connector = plugins.newConnector(connClass);
-            workerConnector = new WorkerConnector(connName, connector, ctx, metrics,  statusListener);
-            log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
-            savedLoader = plugins.compareAndSwapLoaders(connector);
-            workerConnector.initialize(connConfig);
-            workerConnector.transitionTo(initialState);
-            Plugins.compareAndSwapLoaders(savedLoader);
-        } catch (Throwable t) {
-            log.error("Failed to start connector {}", connName, t);
-            // Can't be put in a finally block because it needs to be swapped before the call on
-            // statusListener
-            Plugins.compareAndSwapLoaders(savedLoader);
-            workerMetricsGroup.recordConnectorStartupFailure();
-            statusListener.onFailure(connName, t);
-            return false;
-        }
+        try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
+            if (connectors.containsKey(connName))
+                throw new ConnectException("Connector with name " + connName + " already exists");
+
+            final WorkerConnector workerConnector;
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
+                final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                log.info("Creating connector {} of type {}", connName, connClass);
+                final Connector connector = plugins.newConnector(connClass);
+                workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener);
+                log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
+                savedLoader = plugins.compareAndSwapLoaders(connector);
+                workerConnector.initialize(connConfig);
+                workerConnector.transitionTo(initialState);
+                Plugins.compareAndSwapLoaders(savedLoader);
+            } catch (Throwable t) {
+                log.error("Failed to start connector {}", connName, t);
+                // Can't be put in a finally block because it needs to be swapped before the call on
+                // statusListener
+                Plugins.compareAndSwapLoaders(savedLoader);
+                workerMetricsGroup.recordConnectorStartupFailure();
+                statusListener.onFailure(connName, t);
+                return false;
+            }
 
-        WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
-        if (existing != null)
-            throw new ConnectException("Connector with name " + connName + " already exists");
+            WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
+            if (existing != null)
+                throw new ConnectException("Connector with name " + connName + " already exists");
 
-        log.info("Finished creating connector {}", connName);
-        workerMetricsGroup.recordConnectorStartupSuccess();
+            log.info("Finished creating connector {}", connName);
+            workerMetricsGroup.recordConnectorStartupSuccess();
+        }
         return true;
     }
 
@@ -288,35 +291,37 @@ public class Worker {
      * @return a list of updated tasks properties.
      */
     public List<Map<String, String>> connectorTaskConfigs(String connName, ConnectorConfig connConfig) {
-        log.trace("Reconfiguring connector tasks for {}", connName);
-
-        WorkerConnector workerConnector = connectors.get(connName);
-        if (workerConnector == null)
-            throw new ConnectException("Connector " + connName + " not found in this worker.");
-
-        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
-        Map<String, String> connOriginals = connConfig.originalsStrings();
-
-        Connector connector = workerConnector.connector();
         List<Map<String, String>> result = new ArrayList<>();
-        ClassLoader savedLoader = plugins.currentThreadLoader();
-        try {
-            savedLoader = plugins.compareAndSwapLoaders(connector);
-            String taskClassName = connector.taskClass().getName();
-            for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
-                // Ensure we don't modify the connector's copy of the config
-                Map<String, String> taskConfig = new HashMap<>(taskProps);
-                taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
-                if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG)) {
-                    taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG));
+        try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
+            log.trace("Reconfiguring connector tasks for {}", connName);
+
+            WorkerConnector workerConnector = connectors.get(connName);
+            if (workerConnector == null)
+                throw new ConnectException("Connector " + connName + " not found in this worker.");
+
+            int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+            Map<String, String> connOriginals = connConfig.originalsStrings();
+
+            Connector connector = workerConnector.connector();
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                savedLoader = plugins.compareAndSwapLoaders(connector);
+                String taskClassName = connector.taskClass().getName();
+                for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
+                    // Ensure we don't modify the connector's copy of the config
+                    Map<String, String> taskConfig = new HashMap<>(taskProps);
+                    taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
+                    if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG)) {
+                        taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG));
+                    }
+                    if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG)) {
+                        taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
+                    }
+                    result.add(taskConfig);
                 }
-                if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG)) {
-                    taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
-                }
-                result.add(taskConfig);
+            } finally {
+                Plugins.compareAndSwapLoaders(savedLoader);
             }
-        } finally {
-            Plugins.compareAndSwapLoaders(savedLoader);
         }
 
         return result;
@@ -336,23 +341,25 @@ public class Worker {
      * @return true if the connector belonged to this worker and was successfully stopped.
      */
     public boolean stopConnector(String connName) {
-        log.info("Stopping connector {}", connName);
+        try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
+            log.info("Stopping connector {}", connName);
 
-        WorkerConnector workerConnector = connectors.remove(connName);
-        if (workerConnector == null) {
-            log.warn("Ignoring stop request for unowned connector {}", connName);
-            return false;
-        }
+            WorkerConnector workerConnector = connectors.remove(connName);
+            if (workerConnector == null) {
+                log.warn("Ignoring stop request for unowned connector {}", connName);
+                return false;
+            }
 
-        ClassLoader savedLoader = plugins.currentThreadLoader();
-        try {
-            savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
-            workerConnector.shutdown();
-        } finally {
-            Plugins.compareAndSwapLoaders(savedLoader);
-        }
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector());
+                workerConnector.shutdown();
+            } finally {
+                Plugins.compareAndSwapLoaders(savedLoader);
+            }
 
-        log.info("Stopped connector {}", connName);
+            log.info("Stopped connector {}", connName);
+        }
         return true;
     }
 
@@ -394,84 +401,78 @@ public class Worker {
             TaskStatus.Listener statusListener,
             TargetState initialState
     ) {
-        log.info("Creating task {}", id);
-
-        if (tasks.containsKey(id))
-            throw new ConnectException("Task already exists in this worker: " + id);
-
         final WorkerTask workerTask;
-        ClassLoader savedLoader = plugins.currentThreadLoader();
-        try {
-            final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
-            String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-            ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
-            savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
-            final TaskConfig taskConfig = new TaskConfig(taskProps);
-            final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
-            final Task task = plugins.newTask(taskClass);
-            log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
-
-            // By maintaining connector's specific class loader for this thread here, we first
-            // search for converters within the connector dependencies.
-            // If any of these aren't found, that means the connector didn't configure specific converters,
-            // so we should instantiate based upon the worker configuration
-            Converter keyConverter = plugins.newConverter(
-                    connConfig,
-                    WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
-                    ClassLoaderUsage.CURRENT_CLASSLOADER
-            );
-            Converter valueConverter = plugins.newConverter(
-                    connConfig,
-                    WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
-                    ClassLoaderUsage.CURRENT_CLASSLOADER
-            );
-            HeaderConverter headerConverter = plugins.newHeaderConverter(
-                    connConfig,
-                    WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
-                    ClassLoaderUsage.CURRENT_CLASSLOADER
-            );
-            if (keyConverter == null) {
-                keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
-                log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
-            } else {
-                log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
-            }
-            if (valueConverter == null) {
-                valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
-                log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
-            } else {
-                log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
-            }
-            if (headerConverter == null) {
-                headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
-                log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id);
-            } else {
-                log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
-            }
+        try (LoggingContext loggingContext = LoggingContext.forTask(id)) {
+            log.info("Creating task {}", id);
+
+            if (tasks.containsKey(id))
+                throw new ConnectException("Task already exists in this worker: " + id);
+
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
+                String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
+                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+                final TaskConfig taskConfig = new TaskConfig(taskProps);
+                final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+                final Task task = plugins.newTask(taskClass);
+                log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
+
+                // By maintaining connector's specific class loader for this thread here, we first
+                // search for converters within the connector dependencies.
+                // If any of these aren't found, that means the connector didn't configure specific converters,
+                // so we should instantiate based upon the worker configuration
+                Converter keyConverter = plugins.newConverter(connConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
+                                                                                                                           .CURRENT_CLASSLOADER);
+                Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
+                HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                                             ClassLoaderUsage.CURRENT_CLASSLOADER);
+                if (keyConverter == null) {
+                    keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                    log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
+                } else {
+                    log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
+                }
+                if (valueConverter == null) {
+                    valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+                    log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
+                } else {
+                    log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
+                }
+                if (headerConverter == null) {
+                    headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
+                                                                                                                             .PLUGINS);
+                    log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id);
+                } else {
+                    log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
+                }
 
-            workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
-            workerTask.initialize(taskConfig);
-            Plugins.compareAndSwapLoaders(savedLoader);
-        } catch (Throwable t) {
-            log.error("Failed to start task {}", id, t);
-            // Can't be put in a finally block because it needs to be swapped before the call on
-            // statusListener
-            Plugins.compareAndSwapLoaders(savedLoader);
-            workerMetricsGroup.recordTaskFailure();
-            statusListener.onFailure(id, t);
-            return false;
-        }
+                workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter,
+                                             headerConverter, connectorLoader);
+                workerTask.initialize(taskConfig);
+                Plugins.compareAndSwapLoaders(savedLoader);
+            } catch (Throwable t) {
+                log.error("Failed to start task {}", id, t);
+                // Can't be put in a finally block because it needs to be swapped before the call on
+                // statusListener
+                Plugins.compareAndSwapLoaders(savedLoader);
+                workerMetricsGroup.recordTaskFailure();
+                statusListener.onFailure(id, t);
+                return false;
+            }
 
-        WorkerTask existing = tasks.putIfAbsent(id, workerTask);
-        if (existing != null)
-            throw new ConnectException("Task already exists in this worker: " + id);
+            WorkerTask existing = tasks.putIfAbsent(id, workerTask);
+            if (existing != null)
+                throw new ConnectException("Task already exists in this worker: " + id);
 
-        executor.submit(workerTask);
-        if (workerTask instanceof WorkerSourceTask) {
-            sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
+            executor.submit(workerTask);
+            if (workerTask instanceof WorkerSourceTask) {
+                sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
+            }
+            workerMetricsGroup.recordTaskSuccess();
+            return true;
         }
-        workerMetricsGroup.recordTaskSuccess();
-        return true;
     }
 
     private WorkerTask buildWorkerTask(ClusterConfigState configState,
@@ -592,22 +593,24 @@ public class Worker {
     }
 
     private void stopTask(ConnectorTaskId taskId) {
-        WorkerTask task = tasks.get(taskId);
-        if (task == null) {
-            log.warn("Ignoring stop request for unowned task {}", taskId);
-            return;
-        }
+        try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
+            WorkerTask task = tasks.get(taskId);
+            if (task == null) {
+                log.warn("Ignoring stop request for unowned task {}", taskId);
+                return;
+            }
 
-        log.info("Stopping task {}", task.id());
-        if (task instanceof WorkerSourceTask)
-            sourceTaskOffsetCommitter.remove(task.id());
+            log.info("Stopping task {}", task.id());
+            if (task instanceof WorkerSourceTask)
+                sourceTaskOffsetCommitter.remove(task.id());
 
-        ClassLoader savedLoader = plugins.currentThreadLoader();
-        try {
-            savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-            task.stop();
-        } finally {
-            Plugins.compareAndSwapLoaders(savedLoader);
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                savedLoader = Plugins.compareAndSwapLoaders(task.loader());
+                task.stop();
+            } finally {
+                Plugins.compareAndSwapLoaders(savedLoader);
+            }
         }
     }
 
@@ -620,15 +623,19 @@ public class Worker {
     }
 
     private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
-        WorkerTask task = tasks.remove(taskId);
-        if (task == null) {
-            log.warn("Ignoring await stop request for non-present task {}", taskId);
-            return;
-        }
+        try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
+            WorkerTask task = tasks.remove(taskId);
+            if (task == null) {
+                log.warn("Ignoring await stop request for non-present task {}", taskId);
+                return;
+            }
 
-        if (!task.awaitStop(timeout)) {
-            log.error("Graceful stop of task {} failed.", task.id());
-            task.cancel();
+            if (!task.awaitStop(timeout)) {
+                log.error("Graceful stop of task {} failed.", task.id());
+                task.cancel();
+            } else {
+                log.debug("Graceful stop of task {} succeeded.", task.id());
+            }
         }
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 9cecb3d..28d2a8f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.LoggingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,27 +216,32 @@ abstract class WorkerTask implements Runnable {
 
     @Override
     public void run() {
-        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
-        String savedName = Thread.currentThread().getName();
-        try {
-            Thread.currentThread().setName(THREAD_NAME_PREFIX + id);
-            doRun();
-            onShutdown();
-        } catch (Throwable t) {
-            onFailure(t);
+        // Clear all MDC parameters, in case this thread is being reused
+        LoggingContext.clear();
 
-            if (t instanceof Error)
-                throw (Error) t;
-        } finally {
+        try (LoggingContext loggingContext = LoggingContext.forTask(id())) {
+            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
+            String savedName = Thread.currentThread().getName();
             try {
-                Thread.currentThread().setName(savedName);
-                Plugins.compareAndSwapLoaders(savedLoader);
-                shutdownLatch.countDown();
+                Thread.currentThread().setName(THREAD_NAME_PREFIX + id);
+                doRun();
+                onShutdown();
+            } catch (Throwable t) {
+                onFailure(t);
+
+                if (t instanceof Error)
+                    throw (Error) t;
             } finally {
                 try {
-                    releaseResources();
+                    Thread.currentThread().setName(savedName);
+                    Plugins.compareAndSwapLoaders(savedLoader);
+                    shutdownLatch.countDown();
                 } finally {
-                    taskMetricsGroup.close();
+                    try {
+                        releaseResources();
+                    } finally {
+                        taskMetricsGroup.close();
+                    }
                 }
             }
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
new file mode 100644
index 0000000..8df5f9c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.slf4j.MDC;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A utility for defining Mapped Diagnostic Context (MDC) for SLF4J logs.
+ *
+ * <p>{@link LoggingContext} instances should be created in a try-with-resources block to ensure
+ * that the logging context is properly closed. The only exception is the logging context created
+ * upon thread creation that is to be used for the entire lifetime of the thread.
+ *
+ * <p>Any logger created on the thread will inherit the MDC context, so this mechanism is ideal for
+ * providing additional information in the log messages without requiring connector
+ * implementations to use a specific Connect API or SLF4J API. {@link LoggingContext#close()}
+ * will also properly restore the Connect MDC parameters to their state just prior to when the
+ * LoggingContext was created. Use {@link #clear()} to remove all MDC parameters from the
+ * current thread context.
+ *
+ * <p>Compare this approach to {@link org.apache.kafka.common.utils.LogContext}, which must be
+ * used to create a new {@link org.slf4j.Logger} instance pre-configured with the desired prefix.
+ * Currently LogContext does not allow the prefix to be changed, and it requires that all
+ * components use the LogContext to create their Logger instance.
+ */
+public final class LoggingContext implements AutoCloseable {
+
+    /**
+     * The name of the Mapped Diagnostic Context (MDC) key that defines the context for a connector.
+     */
+    public static final String CONNECTOR_CONTEXT = "connector.context";
+
+    public static final Collection<String> ALL_CONTEXTS = Collections.singleton(CONNECTOR_CONTEXT);
+
+    /**
+     * The Scope values used by Connect when specifying the context.
+     */
+    public enum Scope {
+        /**
+         * The scope value for the worker as it starts a connector.
+         */
+        WORKER("worker"),
+
+        /**
+         * The scope value for Task implementations.
+         */
+        TASK("task"),
+
+        /**
+         * The scope value for committing offsets.
+         */
+        OFFSETS("offsets"),
+
+        /**
+         * The scope value for validating connector configurations.
+         */
+        VALIDATE("validate");
+
+        private final String text;
+        Scope(String value) {
+            this.text = value;
+        }
+
+        @Override
+        public String toString() {
+            return text;
+        }
+    }
+
+    /**
+     * Clear all MDC parameters.
+     */
+    public static void clear() {
+        MDC.clear();
+    }
+
+    /**
+     * Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the
+     * supplied name and the {@link Scope#WORKER} scope.
+     *
+     * @param connectorName the connector name; may not be null
+     */
+    public static LoggingContext forConnector(String connectorName) {
+        Objects.requireNonNull(connectorName);
+        LoggingContext context = new LoggingContext();
+        MDC.put(CONNECTOR_CONTEXT, prefixFor(connectorName, Scope.WORKER, null));
+        return context;
+    }
+
+    /**
+     * Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the
+     * supplied connector name and the {@link Scope#VALIDATE} scope.
+     *
+     * @param connectorName the connector name
+     */
+    public static LoggingContext forValidation(String connectorName) {
+        LoggingContext context = new LoggingContext();
+        MDC.put(CONNECTOR_CONTEXT, prefixFor(connectorName, Scope.VALIDATE, null));
+        return context;
+    }
+
+    /**
+     * Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the
+     * connector name and task number using the supplied {@link ConnectorTaskId}, and to set the scope to {@link Scope#TASK}.
+     *
+     * @param id the connector task ID; may not be null
+     */
+    public static LoggingContext forTask(ConnectorTaskId id) {
+        Objects.requireNonNull(id);
+        LoggingContext context = new LoggingContext();
+        MDC.put(CONNECTOR_CONTEXT, prefixFor(id.connector(), Scope.TASK, id.task()));
+        return context;
+    }
+
+    /**
+     * Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the
+     * connector name and task number using the supplied {@link ConnectorTaskId}, and to set the scope to {@link Scope#OFFSETS}.
+     *
+     * @param id the connector task ID; may not be null
+     */
+    public static LoggingContext forOffsets(ConnectorTaskId id) {
+        Objects.requireNonNull(id);
+        LoggingContext context = new LoggingContext();
+        MDC.put(CONNECTOR_CONTEXT, prefixFor(id.connector(), Scope.OFFSETS, id.task()));
+        return context;
+    }
+
+    /**
+     * Return the prefix that uses the specified connector name, task number, and scope. The
+     * format is as follows:
+     *
+     * <pre>
+     *     [&lt;connectorName>|&lt;scope>]&lt;sp>
+     * </pre>
+     *
+     * where "<code>&lt;connectorName></code>" is the name of the connector,
+     * "<code>&lt;sp></code>" indicates a trailing space, and
+     * "<code>&lt;scope></code>" is one of the following:
+     *
+     * <ul>
+     *   <li>"<code>task-n</code>" for the operation of the numbered task, including calling the
+     *      task methods and the producer/consumer; here "n" is the 0-based task number
+     *   <li>"<code>task-n|offset</code>" for the committing of source offsets for the numbered
+     *       task; here "n" is the * zero-based task number
+     *   <li>"<code>worker</code>" for the creation and usage of connector instances
+     * </ul>
+     *
+     * <p>The following are examples of the connector context for a connector named "my-connector":
+     *
+     * <ul>
+     *   <li>`[my-connector|worker]` - used on log messages where the Connect worker is
+     *     validating the configuration for or starting/stopping the "local-file-source" connector
+     *     via the SourceConnector / SinkConnector implementation methods.
+     *   <li>`[my-connector|task-0]` - used on log messages where the Connect worker is executing
+     *     task 0 of the "local-file-source" connector, including calling any of the SourceTask /
+     *     SinkTask implementation methods, processing the messages for/from the task, and
+     *     calling the task's * producer/consumer.
+     *   <li>`[my-connector|task-0|offsets]` - used on log messages where the Connect worker is
+     *       committing * source offsets for task 0 of the "local-file-source" connector.
+     * </ul>
+     *
+     * @param connectorName the name of the connector; may not be null
+     * @param scope the scope; may not be null
+     * @param taskNumber the 0-based task number; may be null if there is no associated task
+     * @return the prefix; never null
+     */
+    protected static String prefixFor(String connectorName, Scope scope, Integer taskNumber) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        sb.append(connectorName);
+        if (taskNumber != null) {
+            // There is a task number, so this is a task
+            sb.append("|");
+            sb.append(Scope.TASK.toString());
+            sb.append("-");
+            sb.append(taskNumber.toString());
+        }
+        // Append non-task scopes (e.g., worker and offset)
+        if (scope != Scope.TASK) {
+            sb.append("|");
+            sb.append(scope.toString());
+        }
+        sb.append("] ");
+        return sb.toString();
+    }
+
+    private final Map<String, String> previous;
+
+    private LoggingContext() {
+        previous = MDC.getCopyOfContextMap(); // may be null!
+    }
+
+    /**
+     * Close this logging context, restoring the Connect {@link MDC} parameters back to the state
+     * just before this context was created. This does not affect other MDC parameters set by
+     * connectors or tasks.
+     */
+    @Override
+    public void close() {
+        for (String param : ALL_CONTEXTS) {
+            if (previous != null && previous.containsKey(param)) {
+                MDC.put(param, previous.get(param));
+            } else {
+                MDC.remove(param);
+            }
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index baf0d8e..fb7ed82 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -147,6 +147,8 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
         EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
         EasyMock.expect(taskFuture.isDone()).andReturn(false);
         EasyMock.expect(taskFuture.get()).andReturn(null);
+        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
+        EasyMock.expect(taskId.task()).andReturn(1);
         PowerMock.replayAll();
 
         committers.put(taskId, taskFuture);
@@ -160,6 +162,8 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
         EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
         EasyMock.expect(taskFuture.isDone()).andReturn(false);
         EasyMock.expect(taskFuture.get()).andThrow(new CancellationException());
+        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
+        EasyMock.expect(taskId.task()).andReturn(1);
         mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());
         PowerMock.expectLastCall();
         PowerMock.replayAll();
@@ -175,6 +179,8 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
         EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
         EasyMock.expect(taskFuture.isDone()).andReturn(false);
         EasyMock.expect(taskFuture.get()).andThrow(new InterruptedException());
+        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
+        EasyMock.expect(taskId.task()).andReturn(1);
         PowerMock.replayAll();
 
         try {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/LoggingContextTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/LoggingContextTest.java
new file mode 100644
index 0000000..347e508
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/LoggingContextTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.connect.util.LoggingContext.Scope;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class LoggingContextTest {
+
+    private static final Logger log = LoggerFactory.getLogger(LoggingContextTest.class);
+
+    private static final String CONNECTOR_NAME = "MyConnector";
+    private static final ConnectorTaskId TASK_ID1 = new ConnectorTaskId(CONNECTOR_NAME, 1);
+    private static final String EXTRA_KEY1 = "extra.key.1";
+    private static final String EXTRA_VALUE1 = "value1";
+    private static final String EXTRA_KEY2 = "extra.key.2";
+    private static final String EXTRA_VALUE2 = "value2";
+    private static final String EXTRA_KEY3 = "extra.key.3";
+    private static final String EXTRA_VALUE3 = "value3";
+
+    private Map<String, String> mdc;
+
+    @Before
+    public void setup() {
+        mdc = new HashMap<>();
+        Map<String, String> existing = MDC.getCopyOfContextMap();
+        if (existing != null) {
+            mdc.putAll(existing);
+        }
+        MDC.put(EXTRA_KEY1, EXTRA_VALUE1);
+        MDC.put(EXTRA_KEY2, EXTRA_VALUE2);
+    }
+
+    @After
+    public void tearDown() {
+        MDC.clear();
+        MDC.setContextMap(mdc);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullConnectorNameForConnectorContext() {
+        LoggingContext.forConnector(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTaskIdForTaskContext() {
+        LoggingContext.forTask(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTaskIdForOffsetContext() {
+        LoggingContext.forOffsets(null);
+    }
+
+    @Test
+    public void shouldCreateAndCloseLoggingContextEvenWithNullContextMap() {
+        MDC.clear();
+        assertMdc(null, null, null);
+        try (LoggingContext loggingContext = LoggingContext.forConnector(CONNECTOR_NAME)) {
+            assertMdc(CONNECTOR_NAME, null, Scope.WORKER);
+            log.info("Starting Connector");
+        }
+        assertMdc(null, null, null);
+    }
+
+    @Test
+    public void shouldCreateConnectorLoggingContext() {
+        assertMdcExtrasUntouched();
+        assertMdc(null, null, null);
+
+        try (LoggingContext loggingContext = LoggingContext.forConnector(CONNECTOR_NAME)) {
+            assertMdc(CONNECTOR_NAME, null, Scope.WORKER);
+            log.info("Starting Connector");
+        }
+
+        assertMdcExtrasUntouched();
+        assertMdc(null, null, null);
+    }
+
+    @Test
+    public void shouldCreateTaskLoggingContext() {
+        assertMdcExtrasUntouched();
+        try (LoggingContext loggingContext = LoggingContext.forTask(TASK_ID1)) {
+            assertMdc(TASK_ID1.connector(), TASK_ID1.task(), Scope.TASK);
+            log.info("Running task");
+        }
+
+        assertMdcExtrasUntouched();
+        assertMdc(null, null, null);
+    }
+
+    @Test
+    public void shouldCreateOffsetsLoggingContext() {
+        assertMdcExtrasUntouched();
+        try (LoggingContext loggingContext = LoggingContext.forOffsets(TASK_ID1)) {
+            assertMdc(TASK_ID1.connector(), TASK_ID1.task(), Scope.OFFSETS);
+            log.info("Running task");
+        }
+
+        assertMdcExtrasUntouched();
+        assertMdc(null, null, null);
+    }
+
+    @Test
+    public void shouldAllowNestedLoggingContexts() {
+        assertMdcExtrasUntouched();
+        assertMdc(null, null, null);
+        try (LoggingContext loggingContext1 = LoggingContext.forConnector(CONNECTOR_NAME)) {
+            assertMdc(CONNECTOR_NAME, null, Scope.WORKER);
+            log.info("Starting Connector");
+            // Set the extra MDC parameter, as if the connector were
+            MDC.put(EXTRA_KEY3, EXTRA_VALUE3);
+            assertConnectorMdcSet();
+
+            try (LoggingContext loggingContext2 = LoggingContext.forTask(TASK_ID1)) {
+                assertMdc(TASK_ID1.connector(), TASK_ID1.task(), Scope.TASK);
+                log.info("Starting task");
+                // The extra connector-specific MDC parameter should still be set
+                assertConnectorMdcSet();
+
+                try (LoggingContext loggingContext3 = LoggingContext.forOffsets(TASK_ID1)) {
+                    assertMdc(TASK_ID1.connector(), TASK_ID1.task(), Scope.OFFSETS);
+                    assertConnectorMdcSet();
+                    log.info("Offsets for task");
+                }
+
+                assertMdc(TASK_ID1.connector(), TASK_ID1.task(), Scope.TASK);
+                log.info("Stopping task");
+                // The extra connector-specific MDC parameter should still be set
+                assertConnectorMdcSet();
+            }
+
+            assertMdc(CONNECTOR_NAME, null, Scope.WORKER);
+            log.info("Stopping Connector");
+            // The extra connector-specific MDC parameter should still be set
+            assertConnectorMdcSet();
+        }
+        assertMdcExtrasUntouched();
+        assertMdc(null, null, null);
+
+        // The extra connector-specific MDC parameter should still be set
+        assertConnectorMdcSet();
+
+        LoggingContext.clear();
+        assertConnectorMdcUnset();
+    }
+
+    protected void assertMdc(String connectorName, Integer taskId, Scope scope) {
+        String context = MDC.get(LoggingContext.CONNECTOR_CONTEXT);
+        if (context != null) {
+            assertEquals(
+                "Context should begin with connector name when the connector name is non-null",
+                connectorName != null,
+                context.startsWith("[" + connectorName)
+            );
+            if (scope != null) {
+                assertTrue("Context should contain the scope", context.contains(scope.toString()));
+            }
+            if (taskId != null) {
+                assertTrue("Context should contain the taskId", context.contains(taskId.toString()));
+            }
+        } else {
+            assertNull("No logging context found, expected null connector name", connectorName);
+            assertNull("No logging context found, expected null task ID", taskId);
+            assertNull("No logging context found, expected null scope", scope);
+        }
+    }
+
+    protected void assertMdcExtrasUntouched() {
+        assertEquals(EXTRA_VALUE1, MDC.get(EXTRA_KEY1));
+        assertEquals(EXTRA_VALUE2, MDC.get(EXTRA_KEY2));
+    }
+
+    protected void assertConnectorMdcSet() {
+        assertEquals(EXTRA_VALUE3, MDC.get(EXTRA_KEY3));
+    }
+
+    protected void assertConnectorMdcUnset() {
+        assertEquals(null, MDC.get(EXTRA_KEY3));
+    }
+}
\ No newline at end of file
diff --git a/connect/runtime/src/test/resources/log4j.properties b/connect/runtime/src/test/resources/log4j.properties
index a396aee..176692d 100644
--- a/connect/runtime/src/test/resources/log4j.properties
+++ b/connect/runtime/src/test/resources/log4j.properties
@@ -18,7 +18,15 @@ log4j.rootLogger=INFO, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] (%t) %p %m (%c:%L)%n
+#
+# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
+# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
+# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
+#
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
+#
+# The following line includes no MDC context parameters:
+#log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n (%t)
 
 log4j.logger.org.reflections=ERROR
 log4j.logger.kafka=WARN


Mime
View raw message