kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3710: MemoryOffsetBackingStore shutdown
Date Fri, 27 May 2016 04:03:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3cf2de069 -> 9355427ef


KAFKA-3710: MemoryOffsetBackingStore shutdown

ExecutorService needs to be shutdown on close, lest a zombie thread
prevent clean shutdown.

ewencp

Author: Peter Davis <peter.davis@expeditors.com>

Reviewers: Liquan Pei <liquanpei@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1383 from davispw/KAFKA-3710


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9355427e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9355427e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9355427e

Branch: refs/heads/trunk
Commit: 9355427efc4c2c6d0f0d61dd956e5ad187da2dfc
Parents: 3cf2de0
Author: Peter Davis <peter.davis@expeditors.com>
Authored: Thu May 26 21:03:14 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu May 26 21:03:14 2016 -0700

----------------------------------------------------------------------
 .../storage/MemoryOffsetBackingStore.java       | 24 ++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9355427e/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index 669e5f5..e319393 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.slf4j.Logger;
@@ -30,6 +31,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure
this
@@ -40,7 +42,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
     private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
 
     protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
-    protected ExecutorService executor = Executors.newSingleThreadExecutor();
+    protected ExecutorService executor;
 
     public MemoryOffsetBackingStore() {
 
@@ -51,12 +53,26 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
     }
 
     @Override
-    public synchronized void start() {
+    public void start() {
+        executor = Executors.newSingleThreadExecutor();
     }
 
     @Override
-    public synchronized void stop() {
-        // Nothing to do since this doesn't maintain any outstanding connections/data
+    public void stop() {
+        if (executor != null) {
+            executor.shutdown();
+            // Best effort wait for any get() and set() tasks (and caller's callbacks) to
complete.
+            try {
+                executor.awaitTermination(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            if (!executor.shutdownNow().isEmpty()) {
+                throw new ConnectException("Failed to stop MemoryOffsetBackingStore. Exiting
without cleanly " +
+                        "shutting down pending tasks and/or callbacks.");
+            }
+            executor = null;
+        }
     }
 
     @Override


Mime
View raw message