kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4306: Shutdown distributed herder with a timeout.
Date Tue, 06 Dec 2016 22:35:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 39f62ddce -> c9e99f297


KAFKA-4306: Shutdown distributed herder with a timeout.

Resolves

KAFKA-4306: Connect workers won't shut down if brokers are not available
KAFKA-4154: Kafka Connect fails to shutdown if it has not completed startup

Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Shikhar Bhushan <shikhar@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2201 from kkonstantine/KAFKA-4306-Connect-workers-will-not-shut-down-if-brokers-are-not-available


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9e99f29
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9e99f29
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9e99f29

Branch: refs/heads/trunk
Commit: c9e99f297f3090cd348e231dbeaf69c388de1234
Parents: 39f62dd
Author: Konstantine Karantasis <konstantine@confluent.io>
Authored: Tue Dec 6 14:34:52 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Dec 6 14:34:52 2016 -0800

----------------------------------------------------------------------
 .../runtime/distributed/DistributedHerder.java  | 42 +++++++++++---------
 1 file changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9e99f29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index ce2e72a..a8799c6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -59,9 +59,11 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -105,13 +107,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
 
     private final String workerGroupId;
     private final int workerSyncTimeoutMs;
+    private final long workerTasksShutdownTimeoutMs;
     private final int workerUnsyncBackoffMs;
 
+    private final ExecutorService herderExecutor;
     private final ExecutorService forwardRequestExecutor;
     private final ExecutorService startAndStopExecutor;
     private final WorkerGroupMember member;
     private final AtomicBoolean stopping;
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
 
     // Track enough information about the current membership state to be able to determine
which requests via the API
     // and the from other nodes are safe to process
@@ -156,8 +159,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         this.time = time;
         this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
         this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
+        this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
         this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
         this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore,
new RebalanceListener(), time);
+        this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingDeque<Runnable>(1),
+                new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable herder) {
+                        return new Thread(herder, "DistributedHerder");
+                    }
+                });
         this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
         this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
 
@@ -170,8 +181,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
 
     @Override
     public void start() {
-        Thread thread = new Thread(this, "DistributedHerder");
-        thread.start();
+        this.herderExecutor.submit(this);
     }
 
     @Override
@@ -192,10 +202,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
             log.info("Herder stopped");
         } catch (Throwable t) {
             log.error("Uncaught exception in herder work thread, exiting: ", t);
-            stopLatch.countDown();
             System.exit(1);
-        } finally {
-            stopLatch.countDown();
         }
     }
 
@@ -369,20 +376,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
 
         stopping.set(true);
         member.wakeup();
-        while (stopLatch.getCount() > 0) {
-            try {
-                stopLatch.await();
-            } catch (InterruptedException e) {
-                // ignore, should not happen
-            }
-        }
-
-        forwardRequestExecutor.shutdown();
-        startAndStopExecutor.shutdown();
+        herderExecutor.shutdown();
         try {
-            if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS))
+            if (!herderExecutor.awaitTermination(workerTasksShutdownTimeoutMs, TimeUnit.MILLISECONDS))
+                herderExecutor.shutdownNow();
+
+            forwardRequestExecutor.shutdown();
+            startAndStopExecutor.shutdown();
+
+            if (!forwardRequestExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS))
                 forwardRequestExecutor.shutdownNow();
-            if (!startAndStopExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
+            if (!startAndStopExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS))
                 startAndStopExecutor.shutdownNow();
         } catch (InterruptedException e) {
             // ignore


Mime
View raw message