kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4366: KafkaStreams.close() blocks indefinitely
Date Thu, 17 Nov 2016 20:49:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f3aad3b54 -> 2daa10d77


KAFKA-4366: KafkaStreams.close() blocks indefinitely

Added `timeout` and `timeUnit` to `KafkaStreams.close(..)`. Now do close on a thread and `join`
that thread with the provided `timeout`.
Changed `state` in `KafkaStreams` to use an enum.
Added system test to ensure we don't deadlock on close when an uncaught exception handler
that calls `System.exit(..)` is used and there is also a shutdown hook that calls `KafkaStreams.close(...)`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang

Closes #2097 from dguy/kafka-4366


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2daa10d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2daa10d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2daa10d7

Branch: refs/heads/trunk
Commit: 2daa10d77f8f177a8db6ff5de5c511165fedc2f5
Parents: f3aad3b
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Nov 17 12:49:20 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Nov 17 12:49:20 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  93 +++++++++++------
 .../processor/internals/StreamThread.java       |  11 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |  60 ++++++++++-
 .../streams/smoketest/ShutdownDeadlockTest.java | 100 +++++++++++++++++++
 .../streams/smoketest/StreamsSmokeTest.java     |   4 +
 tests/kafkatest/services/streams.py             |  13 ++-
 .../streams/streams_shutdown_deadlock_test.py   |  46 +++++++++
 7 files changed, 282 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2daa10d7/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index e120b31..6b35d24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -91,12 +91,10 @@ public class KafkaStreams {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
     private static final String JMX_PREFIX = "kafka.streams";
+    public static final int DEFAULT_CLOSE_TIMEOUT = 0;
 
-    // container states
-    private static final int CREATED = 0;
-    private static final int RUNNING = 1;
-    private static final int STOPPED = 2;
-    private int state = CREATED;
+    private enum StreamsState { created, running, stopped }
+    private StreamsState state = StreamsState.created;
 
     private final StreamThread[] threads;
     private final Metrics metrics;
@@ -192,14 +190,13 @@ public class KafkaStreams {
     public synchronized void start() {
         log.debug("Starting Kafka Stream process");
 
-        if (state == CREATED) {
-            for (final StreamThread thread : threads)
+        if (state == StreamsState.created) {
+            for (final StreamThread thread : threads) {
                 thread.start();
-
-            state = RUNNING;
-
+            }
+            state = StreamsState.running;
             log.info("Started Kafka Stream process");
-        } else if (state == RUNNING) {
+        } else if (state == StreamsState.running) {
             throw new IllegalStateException("This process was already started.");
         } else {
             throw new IllegalStateException("Cannot restart after closing.");
@@ -210,30 +207,60 @@ public class KafkaStreams {
      * Shutdown this stream instance by signaling all the threads to stop,
      * and then wait for them to join.
      *
-     * @throws IllegalStateException if process has not started yet
+     * This will block until all threads have stopped.
      */
-    public synchronized void close() {
-        log.debug("Stopping Kafka Stream process");
-
-        if (state == RUNNING) {
-            // signal the threads to stop and wait
-            for (final StreamThread thread : threads)
-                thread.close();
+    public void close() {
+        close(DEFAULT_CLOSE_TIMEOUT, TimeUnit.SECONDS);
+    }
 
-            for (final StreamThread thread : threads) {
-                try {
-                    thread.join();
-                } catch (final InterruptedException ex) {
-                    Thread.interrupted();
-                }
+    /**
+     * Shutdown this stream instance by signaling all the threads to stop,
+     * and then wait up to the timeout for the threads to join.
+     *
+     * A timeout of 0 means to wait forever
+     *
+     * @param timeout   how long to wait for {@link StreamThread}s to shutdown
+     * @param timeUnit  unit of time used for timeout
+     * @return true if all threads were successfully stopped
+     */
+    public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
+        log.debug("Stopping Kafka Stream process");
+        if (state == StreamsState.running) {
+            // save the current thread so that if it is a stream thread
+            // we don't attempt to join it and cause a deadlock
+            final Thread shutdown = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                        // signal the threads to stop and wait
+                        for (final StreamThread thread : threads) {
+                            thread.close();
+                        }
+
+                        for (final StreamThread thread : threads) {
+                            try {
+                                if (!thread.stillRunning()) {
+                                    thread.join();
+                                }
+                            } catch (final InterruptedException ex) {
+                                Thread.interrupted();
+                            }
+                        }
+
+                        metrics.close();
+                        log.info("Stopped Kafka Stream process");
+                    }
+            }, "kafka-streams-close-thread");
+            shutdown.setDaemon(true);
+            shutdown.start();
+            try {
+                shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
+            } catch (InterruptedException e) {
+                Thread.interrupted();
             }
+            state = StreamsState.stopped;
+            return !shutdown.isAlive();
         }
-
-        if (state != STOPPED) {
-            metrics.close();
-            state = STOPPED;
-            log.info("Stopped Kafka Stream process");
-        }
+        return true;
 
     }
 
@@ -261,7 +288,7 @@ public class KafkaStreams {
      * @throws IllegalStateException if instance is currently running
      */
     public void cleanUp() {
-        if (state == RUNNING) {
+        if (state == StreamsState.running) {
             throw new IllegalStateException("Cannot clean up while running.");
         }
 
@@ -377,7 +404,7 @@ public class KafkaStreams {
     }
 
     private void validateIsRunning() {
-        if (state != RUNNING) {
+        if (state != StreamsState.running) {
             throw new IllegalStateException("KafkaStreams is not running");
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2daa10d7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1cd8a39..a135a15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -290,6 +290,7 @@ public class StreamThread extends Thread {
         removeStandbyTasks();
 
         log.info("{} Stream thread shutdown complete", logPrefix);
+        running.set(false);
     }
 
     private void unAssignChangeLogPartitions(final boolean rethrowExceptions) {
@@ -492,6 +493,7 @@ public class StreamThread extends Thread {
 
             maybeClean();
         }
+        log.debug("{} Shutting down at user request", logPrefix);
     }
 
     private void maybeUpdateStandbyTasks() {
@@ -538,13 +540,8 @@ public class StreamThread extends Thread {
         }
     }
 
-    private boolean stillRunning() {
-        if (!running.get()) {
-            log.debug("{} Shutting down at user request", logPrefix);
-            return false;
-        }
-
-        return true;
+    public boolean stillRunning() {
+        return running.get();
     }
 
     private void maybePunctuate(StreamTask task) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2daa10d7/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 35b88db..e17e89f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -18,16 +18,25 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class KafkaStreamsTest {
@@ -56,7 +65,7 @@ public class KafkaStreamsTest {
         final int initCountDifference = newInitCount - oldInitCount;
         assertTrue("some reporters should be initialized by calling start()", initCountDifference
> 0);
 
-        streams.close();
+        assertTrue(streams.close(15, TimeUnit.SECONDS));
         Assert.assertEquals("each reporter initialized should also be closed",
             oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
     }
@@ -86,8 +95,8 @@ public class KafkaStreamsTest {
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
         streams.close();
-
         try {
             streams.start();
         } catch (final IllegalStateException e) {
@@ -147,6 +156,52 @@ public class KafkaStreamsTest {
         });
     }
 
+    @Test
+    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
+        final AtomicBoolean keepRunning = new AtomicBoolean(true);
+        try {
+            final Properties props = new Properties();
+            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+            final KStreamBuilder builder = new KStreamBuilder();
+            final CountDownLatch latch = new CountDownLatch(1);
+            final String topic = "input";
+            CLUSTER.createTopic(topic);
+
+            builder.stream(Serdes.String(), Serdes.String(), topic)
+                    .foreach(new ForeachAction<String, String>() {
+                        @Override
+                        public void apply(final String key, final String value) {
+                            try {
+                                latch.countDown();
+                                while (keepRunning.get()) {
+                                    Thread.sleep(10);
+                                }
+                            } catch (InterruptedException e) {
+                                // no-op
+                            }
+                        }
+                    });
+            final KafkaStreams streams = new KafkaStreams(builder, props);
+            streams.start();
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
+                                                                            Collections.singletonList(new
KeyValue<>("A", "A")),
+                                                                            TestUtils.producerConfig(
+                                                                                    CLUSTER.bootstrapServers(),
+                                                                                    StringSerializer.class,
+                                                                                    StringSerializer.class,
+                                                                                    new Properties()),
+                                                                                    System.currentTimeMillis());
+
+            assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS));
+            assertFalse(streams.close(10, TimeUnit.MILLISECONDS));
+        } finally {
+            // stop the thread so we don't interfere with other tests etc
+            keepRunning.set(false);
+        }
+    }
+
 
     private KafkaStreams createKafkaStreams() {
         final Properties props = new Properties();
@@ -191,5 +246,4 @@ public class KafkaStreamsTest {
             streams.close();
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2daa10d7/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
new file mode 100644
index 0000000..7abbd0d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/ShutdownDeadlockTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class ShutdownDeadlockTest {
+
+    private final String kafka;
+    private final String zookeeper;
+
+    public ShutdownDeadlockTest(final String kafka,
+                                final String zookeeper) {
+
+        this.kafka = kafka;
+        this.zookeeper = zookeeper;
+    }
+
+    public void start() {
+        final String topic = "source";
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "shouldNotDeadlock");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(),
topic);
+
+        source.foreach(new ForeachAction<String, String>() {
+            @Override
+            public void apply(final String key, final String value) {
+                throw new RuntimeException("KABOOM!");
+            }
+        });
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(final Thread t, final Throwable e) {
+                System.exit(-1);
+            }
+        });
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                streams.close(5, TimeUnit.SECONDS);
+            }
+        }));
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
+        producer.send(new ProducerRecord<>(topic, "a", "a"));
+        producer.flush();
+
+        streams.start();
+
+        synchronized (this) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                // ignored
+            }
+        }
+
+
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2daa10d7/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
index c26544e..ce0bd2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -64,6 +64,10 @@ public class StreamsSmokeTest {
                     }
                 });
                 break;
+            case "close-deadlock-test":
+                final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka, zookeeper);
+                test.start();
+                break;
             default:
                 System.out.println("unknown command: " + command);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2daa10d7/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 0fea33a..e54eb82 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -60,6 +60,10 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
         except:
             return []
 
+    def stop_nodes(self, clean_shutdown=True):
+        for node in self.nodes:
+            self.stop_node(node, clean_shutdown)
+
     def stop_node(self, node, clean_shutdown=True):
         self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams
Smoke Test on " + str(node.account))
         pids = self.pids(node)
@@ -80,6 +84,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
             self.stop_node(node)
             self.start_node(node)
 
+
     def abortThenRestart(self):
         # We don't want to do any clean up here, just abort then restart the process. The
running service is killed immediately.
         for node in self.nodes:
@@ -88,10 +93,10 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
             self.logger.info("Restarting Kafka Streams on " + str(node.account))
             self.start_node(node)
 
-    def wait(self):
+    def wait(self, timeout_sec=180):
         for node in self.nodes:
             for pid in self.pids(node):
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=180, err_msg="Streams
Smoke Test process on " + str(node.account) + " took too long to exit")
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec,
err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
@@ -137,3 +142,7 @@ class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, context, kafka):
         super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")
+
+class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
+    def __init__(self, context, kafka):
+        super(StreamsSmokeTestShutdownDeadlockService, self).__init__(context, kafka, "close-deadlock-test")

http://git-wip-us.apache.org/repos/asf/kafka/blob/2daa10d7/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
new file mode 100644
index 0000000..5e4e7f2
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import ignore
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService
+
+class StreamsShutdownDeadlockTest(KafkaTest):
+    """
+    Simple test of Kafka Streams.
+    """
+
+    def __init__(self, test_context):
+        super(StreamsShutdownDeadlockTest, self).__init__(test_context, num_zk=1, num_brokers=1,
topics={
+            'source' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.driver = StreamsSmokeTestShutdownDeadlockService(test_context, self.kafka)
+
+    def test_shutdown_wont_deadlock(self):
+        """
+        Start ShutdownDeadLockTest, wait for upt to 1 minute, and check that the process
exited.
+        If it hasn't exited then fail as it is deadlocked
+        """
+
+        self.driver.start()
+
+        self.driver.wait(timeout_sec=60)
+
+        self.driver.stop_nodes(clean_shutdown=False)
+
+        self.driver.stop()
+


Mime
View raw message