kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)
Date Tue, 02 Jan 2018 17:32:27 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47db063  KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)
47db063 is described below

commit 47db063c310cf47e4c544196acab2abfe62977b0
Author: Rohan <desai.p.rohan@gmail.com>
AuthorDate: Tue Jan 2 09:32:17 2018 -0800

    KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)
    
    * KAFKA-6383: complete shutdown for CREATED StreamThreads
    
    When transitioning StreamThread from CREATED to PENDING_SHUTDOWN
    free up resources from the caller, rather than the stream thread,
    since in this case the stream thread was never actually started.
    
    Have StreamThread.setState return the old state. If the old state is
    CREATED in StreamThread.shutdown() then start the thread so that it
    can free the resources owned by the StreamThread.
    
    Add a KafkaStreams test to verify that the producer gets closed even
    if KafkaStreams was not started
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/processor/internals/StreamThread.java  | 31 +++++++++++-----
 .../streams/processor/internals/TaskManager.java   |  6 ++-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 24 ++++++++++++
 .../processor/internals/StreamThreadTest.java      | 43 +++++++++++++++++++++-
 4 files changed, 91 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 696081d..ff440cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -173,23 +173,26 @@ public class StreamThread extends Thread {
     /**
      * Sets the state
      * @param newState New state
+     * @return The state prior to the call to setState, or null if the transition is invalid
      */
-    boolean setState(final State newState) {
-        final State oldState = state;
+    State setState(final State newState) {
+        final State oldState;
 
         synchronized (stateLock) {
+            oldState = state;
+
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                 // when the state is already in PENDING_SHUTDOWN, all other transitions will
be
                 // refused but we do not throw exception here
-                return false;
+                return null;
             } else if (state == State.DEAD) {
                 // when the state is already in NOT_RUNNING, all its transitions
                 // will be refused but we do not throw exception here
-                return false;
+                return null;
             } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED)
{
                 // when the state is already in PARTITIONS_REVOKED, its transition to itself
will be
                 // refused but we do not throw exception here
-                return false;
+                return null;
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state transition from
" + oldState + " to " + newState);
@@ -209,7 +212,7 @@ public class StreamThread extends Thread {
             stateListener.onChange(this, state, oldState);
         }
 
-        return true;
+        return oldState;
     }
 
     public boolean isRunningAndNotRebalancing() {
@@ -251,7 +254,7 @@ public class StreamThread extends Thread {
 
             final long start = time.milliseconds();
             try {
-                if (!streamThread.setState(State.PARTITIONS_ASSIGNED)) {
+                if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
                     return;
                 }
                 taskManager.createTasks(assignment);
@@ -281,7 +284,7 @@ public class StreamThread extends Thread {
                 taskManager.activeTaskIds(),
                 taskManager.standbyTaskIds());
 
-            if (streamThread.setState(State.PARTITIONS_REVOKED)) {
+            if (streamThread.setState(State.PARTITIONS_REVOKED) != null) {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
@@ -714,7 +717,11 @@ public class StreamThread extends Thread {
     @Override
     public void run() {
         log.info("Starting");
-        setState(State.RUNNING);
+        if (setState(State.RUNNING) == null) {
+            log.info("StreamThread already shutdown. Not running");
+            completeShutdown(true);
+            return;
+        }
         boolean cleanRun = false;
         try {
             runLoop();
@@ -1088,7 +1095,11 @@ public class StreamThread extends Thread {
      */
     public void shutdown() {
         log.info("Informed to shut down");
-        setState(State.PENDING_SHUTDOWN);
+        State oldState = setState(State.PENDING_SHUTDOWN);
+        if (oldState == State.CREATED) {
+            // Start so that we shutdown on the thread
+            this.start();
+        }
     }
 
     private void completeShutdown(final boolean cleanRun) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d70c8f3..bdc1c00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -274,7 +274,11 @@ class TaskManager {
         standby.close(clean);
 
         // remove the changelog partitions from restore consumer
-        restoreConsumer.unsubscribe();
+        try {
+            restoreConsumer.unsubscribe();
+        } catch (final RuntimeException fatalException) {
+            firstException.compareAndSet(null, fatalException);
+        }
         taskCreator.close();
         standbyTaskCreator.close();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 8746c62..a2084b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
@@ -31,6 +32,7 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
@@ -114,6 +116,28 @@ public class KafkaStreamsTest {
     }
 
     @Test
+    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(props),
clientSupplier);
+        streams.close();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.NOT_RUNNING;
+            }
+        }, 10 * 1000, "Streams never stopped.");
+
+        // Ensure that any created clients are closed
+        assertTrue(clientSupplier.consumer.closed());
+        assertTrue(clientSupplier.restoreConsumer.closed());
+        for (MockProducer p : clientSupplier.producers) {
+            assertTrue(p.closed());
+        }
+    }
+
+    @Test
     public void testStateThreadClose() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8bcd6fb..fca380f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -438,8 +438,13 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"", "", Collections.<String, String>emptyMap());
-        final StreamThread thread = new StreamThread(mockTime,
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
+                metrics,
+                "",
+                "",
+                Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(
+                mockTime,
                 config,
                 consumer,
                 consumer,
@@ -456,6 +461,40 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        taskManager.shutdown(true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(taskManager, consumer);
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
+                metrics,
+                "",
+                "",
+                Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(
+                mockTime,
+                config,
+                consumer,
+                consumer,
+                null,
+                taskManager,
+                streamsMetrics,
+                internalTopologyBuilder,
+                clientId,
+                new LogContext(""));
+        thread.shutdown();
+        try {
+            thread.join(1000);
+        } catch (final InterruptedException e) {
+            fail("Join interrupted");
+        }
+        assertFalse(thread.isAlive());
+        EasyMock.verify(taskManager);
+    }
+
+    @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology()
throws InterruptedException {
         internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
         internalTopologyBuilder.addSink("out", "output", null, null, null);

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message