kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6383: Complete shut down for streams threads that have not started
Date Thu, 11 Jan 2018 00:24:54 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d544d4  KAFKA-6383: Complete shut down for streams threads that have not started
4d544d4 is described below

commit 4d544d4f08b027ce318b088b112f03dccc566098
Author: Rohan Desai <desai.p.rohan@gmail.com>
AuthorDate: Wed Jan 10 16:24:43 2018 -0800

    KAFKA-6383: Complete shut down for streams threads that have not started
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Rohan Desai <desai.p.rohan@gmail.com>
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
    
    Closes #4382 from rodesai/KAFKA-6383
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 22 +++----
 .../streams/processor/internals/StreamThread.java  | 30 ++++++---
 .../streams/processor/internals/TaskManager.java   |  6 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 34 ++++++++++
 .../processor/internals/StreamThreadTest.java      | 76 +++++++++++++++++++++-
 5 files changed, 143 insertions(+), 25 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c7ae613..1a70e46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -817,23 +817,23 @@ public class KafkaStreams {
         } else {
             stateDirCleaner.shutdownNow();
 
-            // notify all the threads to stop; avoid deadlocks by stopping any
-            // further state reports from the thread since we're shutting down
-            for (final StreamThread thread : threads) {
-                thread.setStateListener(null);
-                thread.shutdown();
-            }
-            if (globalStreamThread != null) {
-                globalStreamThread.setStateListener(null);
-                globalStreamThread.shutdown();
-            }
-
             // wait for all threads to join in a separate thread;
             // save the current thread so that if it is a stream thread
             // we don't attempt to join it and cause a deadlock
             final Thread shutdownThread = new Thread(new Runnable() {
                 @Override
                 public void run() {
+                    // notify all the threads to stop; avoid deadlocks by stopping any
+                    // further state reports from the thread since we're shutting down
+                    for (final StreamThread thread : threads) {
+                        thread.setStateListener(null);
+                        thread.shutdown();
+                    }
+                    if (globalStreamThread != null) {
+                        globalStreamThread.setStateListener(null);
+                        globalStreamThread.shutdown();
+                    }
+
                     for (final StreamThread thread : threads) {
                         try {
                             if (!thread.isRunning()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 696081d..cb133c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -173,23 +173,26 @@ public class StreamThread extends Thread {
     /**
      * Sets the state
      * @param newState New state
+     * @return The state prior to the call to setState, or null if the transition is invalid
      */
-    boolean setState(final State newState) {
-        final State oldState = state;
+    State setState(final State newState) {
+        final State oldState;
 
         synchronized (stateLock) {
+            oldState = state;
+
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                 // when the state is already in PENDING_SHUTDOWN, all other transitions will
be
                 // refused but we do not throw exception here
-                return false;
+                return null;
             } else if (state == State.DEAD) {
                 // when the state is already in NOT_RUNNING, all its transitions
                 // will be refused but we do not throw exception here
-                return false;
+                return null;
             } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED)
{
                 // when the state is already in PARTITIONS_REVOKED, its transition to itself
will be
                 // refused but we do not throw exception here
-                return false;
+                return null;
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state transition from
" + oldState + " to " + newState);
@@ -209,7 +212,7 @@ public class StreamThread extends Thread {
             stateListener.onChange(this, state, oldState);
         }
 
-        return true;
+        return oldState;
     }
 
     public boolean isRunningAndNotRebalancing() {
@@ -251,7 +254,7 @@ public class StreamThread extends Thread {
 
             final long start = time.milliseconds();
             try {
-                if (!streamThread.setState(State.PARTITIONS_ASSIGNED)) {
+                if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
                     return;
                 }
                 taskManager.createTasks(assignment);
@@ -281,7 +284,7 @@ public class StreamThread extends Thread {
                 taskManager.activeTaskIds(),
                 taskManager.standbyTaskIds());
 
-            if (streamThread.setState(State.PARTITIONS_REVOKED)) {
+            if (streamThread.setState(State.PARTITIONS_REVOKED) != null) {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
@@ -714,7 +717,10 @@ public class StreamThread extends Thread {
     @Override
     public void run() {
         log.info("Starting");
-        setState(State.RUNNING);
+        if (setState(State.RUNNING) == null) {
+            log.info("StreamThread already shutdown. Not running");
+            return;
+        }
         boolean cleanRun = false;
         try {
             runLoop();
@@ -1088,7 +1094,11 @@ public class StreamThread extends Thread {
      */
     public void shutdown() {
         log.info("Informed to shut down");
-        setState(State.PENDING_SHUTDOWN);
+        State oldState = setState(State.PENDING_SHUTDOWN);
+        if (oldState == State.CREATED) {
+            // The thread may not have been started. Take responsibility for shutting down
+            completeShutdown(true);
+        }
     }
 
     private void completeShutdown(final boolean cleanRun) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d70c8f3..bdc1c00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -274,7 +274,11 @@ class TaskManager {
         standby.close(clean);
 
         // remove the changelog partitions from restore consumer
-        restoreConsumer.unsubscribe();
+        try {
+            restoreConsumer.unsubscribe();
+        } catch (final RuntimeException fatalException) {
+            firstException.compareAndSet(null, fatalException);
+        }
         taskCreator.close();
         standbyTaskCreator.close();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 8746c62..9770173 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
@@ -31,6 +35,7 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
@@ -42,8 +47,10 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -114,6 +121,33 @@ public class KafkaStreamsTest {
     }
 
     @Test
+    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        List<Node> nodes = Arrays.asList(new Node(0, "localhost", 8121));
+        Cluster cluster = new Cluster("mockClusterId", nodes,
+            Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
+            Collections.<String>emptySet(), nodes.get(0));
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+        clientSupplier.setClusterForAdminClient(cluster);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(props),
clientSupplier);
+        streams.close();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.NOT_RUNNING;
+            }
+        }, 10 * 1000, "Streams never stopped.");
+
+        // Ensure that any created clients are closed
+        assertTrue(clientSupplier.consumer.closed());
+        assertTrue(clientSupplier.restoreConsumer.closed());
+        for (MockProducer p : clientSupplier.producers) {
+            assertTrue(p.closed());
+        }
+    }
+
+    @Test
     public void testStateThreadClose() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6b760c1..e67fe14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -454,8 +454,13 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"", "", Collections.<String, String>emptyMap());
-        final StreamThread thread = new StreamThread(mockTime,
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
+                metrics,
+                "",
+                "",
+                Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(
+                mockTime,
                 config,
                 consumer,
                 consumer,
@@ -465,8 +470,73 @@ public class StreamThreadTest {
                 internalTopologyBuilder,
                 clientId,
                 new LogContext(""));
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setStateListener(
+                new StreamThread.StateListener() {
+                    @Override
+                    public void onChange(final Thread t, final ThreadStateTransitionValidator
newState, final ThreadStateTransitionValidator oldState) {
+                        if (oldState == StreamThread.State.CREATED && newState ==
StreamThread.State.RUNNING) {
+                            thread.shutdown();
+                        }
+                    }
+                });
+        thread.run();
+        EasyMock.verify(taskManager);
+    }
+
+    @Test
+    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        taskManager.shutdown(true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager, consumer);
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
+                metrics,
+                "",
+                "",
+                Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(
+                mockTime,
+                config,
+                consumer,
+                consumer,
+                null,
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
+        thread.shutdown();
+        EasyMock.verify(taskManager);
+    }
+
+    @Test
+    public void shouldOnlyShutdownOnce() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        taskManager.shutdown(true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager, consumer);
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
+            metrics,
+            "",
+            "",
+            Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            consumer,
+            consumer,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            clientId,
+            new LogContext(""));
         thread.shutdown();
+        // Execute the run method. Verification of the mock will check that shutdown was
only done once
         thread.run();
         EasyMock.verify(taskManager);
     }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message