From commits-return-9530-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Fri May 18 17:40:28 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5489B1890D for ; Fri, 18 May 2018 17:40:28 +0000 (UTC) Received: (qmail 99600 invoked by uid 500); 18 May 2018 17:40:28 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 99560 invoked by uid 500); 18 May 2018 17:40:28 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 99551 invoked by uid 99); 18 May 2018 17:40:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 May 2018 17:40:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 204DC862F3; Fri, 18 May 2018 17:40:26 +0000 (UTC) Date: Fri, 18 May 2018 17:40:26 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6566: Improve Connect Resource Cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152666522630.27466.16128514882166478393@gitbox.apache.org> From: ewencp@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 58a910f0a7e6e2da4af9f1fcebdabeb22d909fcb X-Git-Newrev: ee8abb2f7053575bd2abec8152907e0642b1d713 X-Git-Rev: ee8abb2f7053575bd2abec8152907e0642b1d713 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ee8abb2 KAFKA-6566: Improve Connect Resource Cleanup ee8abb2 is described below commit ee8abb2f7053575bd2abec8152907e0642b1d713 Author: Robert Yokota AuthorDate: Fri May 18 10:39:34 2018 -0700 KAFKA-6566: Improve Connect Resource Cleanup This is a change to improve resource cleanup for sink tasks and source tasks. Now `Task.stop()` is called from both `WorkerSinkTask.close()` and `WorkerSourceTask.close()`. It is called from `WorkerXXXTask.close()` since this method is called in the `finally` block of `WorkerTask.run()`, and Connect developers use `stop()` to clean up resources. Author: Robert Yokota Reviewers: Randall Hauch , Ewen Cheslack-Postava Closes #5020 from rayokota/K6566-improve-connect-resource-cleanup --- .../kafka/connect/runtime/WorkerSinkTask.java | 21 +++++++++++--- .../kafka/connect/runtime/WorkerSourceTask.java | 33 +++++++++++++++++++--- 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 2ba785c..6edcfd4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -148,10 +148,23 @@ class WorkerSinkTask extends WorkerTask { protected void close() { // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout // passed in - task.stop(); - if (consumer != null) - consumer.close(); - transformationChain.close(); + try { + task.stop(); + } catch (Throwable t) { + log.warn("Could not stop task", t); + } + if (consumer != null) { + try { + consumer.close(); + } catch (Throwable t) { + log.warn("Could not close consumer", t); + } + } + try { + transformationChain.close(); + } catch (Throwable t) { + log.warn("Could not close transformation chain", t); + } } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index f2cef5a..f17475d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -87,6 +87,7 @@ class WorkerSourceTask extends WorkerTask { private Map taskConfig; private boolean finishedStart = false; private boolean startedShutdownBeforeStartCompleted = false; + private boolean stopped = false; public WorkerSourceTask(ConnectorTaskId id, SourceTask task, @@ -137,8 +138,21 @@ class WorkerSourceTask extends WorkerTask { @Override protected void close() { - producer.close(30, TimeUnit.SECONDS); - transformationChain.close(); + if (!shouldPause()) { + tryStop(); + } + if (producer != null) { + try { + producer.close(30, TimeUnit.SECONDS); + } catch (Throwable t) { + log.warn("Could not close producer", t); + } + } + try { + transformationChain.close(); + } catch (Throwable t) { + log.warn("Could not close transformation chain", t); + } } @Override @@ -152,12 +166,23 @@ class WorkerSourceTask extends WorkerTask { stopRequestedLatch.countDown(); synchronized (this) { if (finishedStart) - task.stop(); + tryStop(); else startedShutdownBeforeStartCompleted = true; } } + private synchronized void tryStop() { + if (!stopped) { + try { + task.stop(); + stopped = true; + } catch (Throwable t) { + log.warn("Could not stop task", t); + } + } + } + @Override public void execute() { try { @@ -166,7 +191,7 @@ class WorkerSourceTask extends WorkerTask { log.info("{} Source task finished initialization and start", this); synchronized (this) { if (startedShutdownBeforeStartCompleted) { - task.stop(); + tryStop(); return; } finishedStart = true; -- To stop receiving notification emails like this one, please contact ewencp@apache.org.