kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Change Trogdor agent's cleanup executor to a cached thread pool (#6309)
Date Tue, 12 Mar 2019 15:32:11 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a384da  MINOR: Change Trogdor agent's cleanup executor to a cached thread pool (#6309)
9a384da is described below

commit 9a384daf0eafb26e2e445c62659be78956c6e1c9
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Tue Mar 12 17:31:50 2019 +0200

    MINOR: Change Trogdor agent's cleanup executor to a cached thread pool (#6309)
    
    It is best to use a growing thread pool for worker cleanups. This lets us ensure that
we close workers as fast as possible and not get slowed down on blocking cleanups.
    
    Reviewers: Colin McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
---
 .../main/java/org/apache/kafka/trogdor/agent/WorkerManager.java  | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index ba0c3b5..5ef9299 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -85,7 +86,7 @@ public final class WorkerManager {
     /**
      * An ExecutorService used to clean up TaskWorkers.
      */
-    private final ScheduledExecutorService workerCleanupExecutor;
+    private final ExecutorService workerCleanupExecutor;
 
     /**
      * An ExecutorService to help with shutting down.
@@ -161,7 +162,7 @@ public final class WorkerManager {
         this.workers = new HashMap<>();
         this.stateChangeExecutor = Executors.newSingleThreadScheduledExecutor(
                 ThreadUtils.createThreadFactory("WorkerManagerStateThread", false));
-        this.workerCleanupExecutor = Executors.newScheduledThreadPool(1,
+        this.workerCleanupExecutor = Executors.newCachedThreadPool(
             ThreadUtils.createThreadFactory("WorkerCleanupThread%d", false));
         this.shutdownExecutor = Executors.newScheduledThreadPool(0,
             ThreadUtils.createThreadFactory("WorkerManagerShutdownThread%d", false));
@@ -289,13 +290,13 @@ public final class WorkerManager {
                 Math.max(0, spec.endMs() - time.milliseconds()));
         }
 
-        void transitionToStopping() {
+        Future<Void> transitionToStopping() {
             state = State.STOPPING;
             if (timeoutFuture != null) {
                 timeoutFuture.cancel(false);
                 timeoutFuture = null;
             }
-            workerCleanupExecutor.submit(new HaltWorker(this));
+            return workerCleanupExecutor.submit(new HaltWorker(this));
         }
 
         void transitionToDone() {


Mime
View raw message