kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: Revert "KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)"
Date Tue, 02 Jan 2018 22:22:11 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2119e9f  Revert "KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)"
2119e9f is described below

commit 2119e9f26ed988746ee22d139d1eb1aca2bbb950
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Jan 2 14:21:42 2018 -0800

    Revert "KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)"
    
    This reverts commit 47db063c310cf47e4c544196acab2abfe62977b0.
---
 .../streams/processor/internals/StreamThread.java  | 31 +++++-----------
 .../streams/processor/internals/TaskManager.java   |  6 +--
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 24 ------------
 .../processor/internals/StreamThreadTest.java      | 43 +---------------------
 4 files changed, 13 insertions(+), 91 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ff440cc..696081d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -173,26 +173,23 @@ public class StreamThread extends Thread {
     /**
      * Sets the state
      * @param newState New state
-     * @return The state prior to the call to setState, or null if the transition is invalid
      */
-    State setState(final State newState) {
-        final State oldState;
+    boolean setState(final State newState) {
+        final State oldState = state;
 
         synchronized (stateLock) {
-            oldState = state;
-
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                 // when the state is already in PENDING_SHUTDOWN, all other transitions will
be
                 // refused but we do not throw exception here
-                return null;
+                return false;
             } else if (state == State.DEAD) {
                 // when the state is already in NOT_RUNNING, all its transitions
                 // will be refused but we do not throw exception here
-                return null;
+                return false;
             } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED)
{
                 // when the state is already in PARTITIONS_REVOKED, its transition to itself
will be
                 // refused but we do not throw exception here
-                return null;
+                return false;
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state transition from
" + oldState + " to " + newState);
@@ -212,7 +209,7 @@ public class StreamThread extends Thread {
             stateListener.onChange(this, state, oldState);
         }
 
-        return oldState;
+        return true;
     }
 
     public boolean isRunningAndNotRebalancing() {
@@ -254,7 +251,7 @@ public class StreamThread extends Thread {
 
             final long start = time.milliseconds();
             try {
-                if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
+                if (!streamThread.setState(State.PARTITIONS_ASSIGNED)) {
                     return;
                 }
                 taskManager.createTasks(assignment);
@@ -284,7 +281,7 @@ public class StreamThread extends Thread {
                 taskManager.activeTaskIds(),
                 taskManager.standbyTaskIds());
 
-            if (streamThread.setState(State.PARTITIONS_REVOKED) != null) {
+            if (streamThread.setState(State.PARTITIONS_REVOKED)) {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
@@ -717,11 +714,7 @@ public class StreamThread extends Thread {
     @Override
     public void run() {
         log.info("Starting");
-        if (setState(State.RUNNING) == null) {
-            log.info("StreamThread already shutdown. Not running");
-            completeShutdown(true);
-            return;
-        }
+        setState(State.RUNNING);
         boolean cleanRun = false;
         try {
             runLoop();
@@ -1095,11 +1088,7 @@ public class StreamThread extends Thread {
      */
     public void shutdown() {
         log.info("Informed to shut down");
-        State oldState = setState(State.PENDING_SHUTDOWN);
-        if (oldState == State.CREATED) {
-            // Start so that we shutdown on the thread
-            this.start();
-        }
+        setState(State.PENDING_SHUTDOWN);
     }
 
     private void completeShutdown(final boolean cleanRun) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bdc1c00..d70c8f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -274,11 +274,7 @@ class TaskManager {
         standby.close(clean);
 
         // remove the changelog partitions from restore consumer
-        try {
-            restoreConsumer.unsubscribe();
-        } catch (final RuntimeException fatalException) {
-            firstException.compareAndSet(null, fatalException);
-        }
+        restoreConsumer.unsubscribe();
         taskCreator.close();
         standbyTaskCreator.close();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a2084b0..8746c62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
@@ -32,7 +31,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
@@ -116,28 +114,6 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
-        final StreamsBuilder builder = new StreamsBuilder();
-        builder.globalTable("anyTopic");
-        MockClientSupplier clientSupplier = new MockClientSupplier();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(props),
clientSupplier);
-        streams.close();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return streams.state() == KafkaStreams.State.NOT_RUNNING;
-            }
-        }, 10 * 1000, "Streams never stopped.");
-
-        // Ensure that any created clients are closed
-        assertTrue(clientSupplier.consumer.closed());
-        assertTrue(clientSupplier.restoreConsumer.closed());
-        for (MockProducer p : clientSupplier.producers) {
-            assertTrue(p.closed());
-        }
-    }
-
-    @Test
     public void testStateThreadClose() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index cca7045..4250465 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -441,13 +441,8 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
-                metrics,
-                "",
-                "",
-                Collections.<String, String>emptyMap());
-        final StreamThread thread = new StreamThread(
-                mockTime,
+        StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"", "", Collections.<String, String>emptyMap());
+        final StreamThread thread = new StreamThread(mockTime,
                 config,
                 consumer,
                 consumer,
@@ -464,40 +459,6 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldShutdownTaskManagerOnCloseWithoutStart() {
-        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-        taskManager.shutdown(true);
-        EasyMock.expectLastCall();
-        EasyMock.replay(taskManager, consumer);
-
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
-                metrics,
-                "",
-                "",
-                Collections.<String, String>emptyMap());
-        final StreamThread thread = new StreamThread(
-                mockTime,
-                config,
-                consumer,
-                consumer,
-                null,
-                taskManager,
-                streamsMetrics,
-                internalTopologyBuilder,
-                clientId,
-                new LogContext(""));
-        thread.shutdown();
-        try {
-            thread.join(1000);
-        } catch (final InterruptedException e) {
-            fail("Join interrupted");
-        }
-        assertFalse(thread.isAlive());
-        EasyMock.verify(taskManager);
-    }
-
-    @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology()
throws InterruptedException {
         internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
         internalTopologyBuilder.addSink("out", "output", null, null, null);

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message