kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: K7657 handling thread dead state change (#6091)
Date Sat, 05 Jan 2019 05:28:06 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e4799b  K7657 handling thread dead state change (#6091)
8e4799b is described below

commit 8e4799b017c6a3c65eaf47f60ab136654c9741de
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Jan 4 21:27:50 2019 -0800

    K7657 handling thread dead state change (#6091)
    
    While looking into KAFKA-7657, I found there are a few loopholes in this logic:
    
    We kept a map of thread-name to thread-state and a global-thread state at the KafkaStreams
instance-level, in addition to the instance state itself. stateLock is used when accessing
the instance state, however when we are in the thread state change callback, we are accessing
both the thread-states as well as the instance state at the same time in the callers of setState
without a lock, which is vulnerable to concurrent multi-stream threads. The fix is a) introduce
a threadStatesLock i [...]
    
    When transiting to state.RUNNING, we check if all threads are either in RUNNING or DEAD
state, this is because some threads maybe dead at the rebalance period but we should still
proceed to RUNNING if the rest of threads are still transiting to RUNNING.
    
    Added unit test for 2) above. Also simplified another test as a nit change.
    
    Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Matthias J. Sax <mjsax@apache.org>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 174 ++++++++++-----------
 .../streams/processor/internals/StreamThread.java  |  67 ++++----
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  75 +++++++--
 .../StreamTableJoinIntegrationTest.java            |   2 +-
 .../integration/utils/IntegrationTestUtils.java    |  10 +-
 .../processor/internals/StreamThreadTest.java      |  50 +++---
 6 files changed, 212 insertions(+), 166 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbd0105..c50c5d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -159,24 +159,24 @@ public class KafkaStreams implements AutoCloseable {
      *         |       +-----+--------+
      *         |             |
      *         |             v
-     *         |       +--------------+
-     *         +<----- | Running (2)  | -------->+
-     *         |       +----+--+------+          |
-     *         |            |  ^                 |
-     *         |            v  |                 |
-     *         |       +----+--+------+          |
-     *         |       | Re-          |          v
-     *         |       | Balancing (1)| -------->+
-     *         |       +-----+--------+          |
-     *         |             |                   |
-     *         |             v                   v
-     *         |       +-----+--------+     +----+-------+
+     *         |       +----+--+------+
+     *         |       | Re-          |
+     *         +<----- | Balancing (1)| -------->+
+     *         |       +-----+-+------+          |
+     *         |             | ^                 |
+     *         |             v |                 |
+     *         |       +--------------+          v
+     *         |       | Running (2)  | -------->+
+     *         |       +------+-------+          |
+     *         |              |                  |
+     *         |              v                  |
+     *         |       +------+-------+     +----+-------+
      *         +-----> | Pending      |<--- | Error (5)  |
      *                 | Shutdown (3) |     +------------+
-     *                 +-----+--------+
-     *                       |
-     *                       v
-     *                 +-----+--------+
+     *                 +------+-------+
+     *                        |
+     *                        v
+     *                 +------+-------+
      *                 | Not          |
      *                 | Running (4)  |
      *                 +--------------+
@@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable {
      *   the instance will be in the ERROR state. The user will need to close it.
      */
     public enum State {
-        CREATED(2, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING,
ERROR(3);
+        CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING,
ERROR(3, 5);
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -238,9 +238,11 @@ public class KafkaStreams implements AutoCloseable {
      * @param newState New state
      */
     private boolean setState(final State newState) {
-        final State oldState = state;
+        final State oldState;
 
         synchronized (stateLock) {
+            oldState = state;
+
             if (state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING)
{
                 // when the state is already in PENDING_SHUTDOWN, all other transitions than
NOT_RUNNING (due to thread dying) will be
                 // refused but we do not throw exception here, to allow appropriate error
handling
@@ -249,6 +251,12 @@ public class KafkaStreams implements AutoCloseable {
                 // when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN
or NOT_RUNNING (due to consecutive close calls)
                 // will be refused but we do not throw exception here, to allow idempotent
close calls
                 return false;
+            } else if (state == State.REBALANCING && newState == State.REBALANCING)
{
+                // when the state is already in REBALANCING, it should not transit to REBALANCING
again
+                return false;
+            } else if (state == State.ERROR && newState == State.ERROR) {
+                // when the state is already in ERROR, it should not transit to ERROR again
+                return false;
             } else if (!state.isValidTransition(newState)) {
                 throw new IllegalStateException("Stream-client " + clientId + ": Unexpected
state transition from " + oldState + " to " + newState);
             } else {
@@ -260,24 +268,7 @@ public class KafkaStreams implements AutoCloseable {
 
         // we need to call the user customized state listener outside the state lock to avoid
potential deadlocks
         if (stateListener != null) {
-            stateListener.onChange(state, oldState);
-        }
-
-        return true;
-    }
-
-    private boolean setRunningFromCreated() {
-        synchronized (stateLock) {
-            if (state != State.CREATED) {
-                throw new IllegalStateException("Stream-client " + clientId + ": Unexpected
state transition from " + state + " to " + State.RUNNING);
-            }
-            state = State.RUNNING;
-            stateLock.notifyAll();
-        }
-
-        // we need to call the user customized state listener outside the state lock to avoid
potential deadlocks
-        if (stateListener != null) {
-            stateListener.onChange(State.RUNNING, State.CREATED);
+            stateListener.onChange(newState, oldState);
         }
 
         return true;
@@ -324,11 +315,12 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state
{@link State#CREATED CREATED}.
      */
     public void setStateListener(final KafkaStreams.StateListener listener) {
-        if (state == State.CREATED) {
-            stateListener = listener;
-        } else {
-            throw new IllegalStateException("Can only set StateListener in CREATED state.
" +
-                    "Current state is: " + state);
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                stateListener = listener;
+            } else {
+                throw new IllegalStateException("Can only set StateListener in CREATED state.
Current state is: " + state);
+            }
         }
     }
 
@@ -340,17 +332,19 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state
{@link State#CREATED CREATED}.
      */
     public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
-        if (state == State.CREATED) {
-            for (final StreamThread thread : threads) {
-                thread.setUncaughtExceptionHandler(eh);
-            }
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                for (final StreamThread thread : threads) {
+                    thread.setUncaughtExceptionHandler(eh);
+                }
 
-            if (globalStreamThread != null) {
-                globalStreamThread.setUncaughtExceptionHandler(eh);
-            }
-        } else {
-            throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED
state. " +
+                if (globalStreamThread != null) {
+                    globalStreamThread.setUncaughtExceptionHandler(eh);
+                }
+            } else {
+                throw new IllegalStateException("Can only set UncaughtExceptionHandler in
CREATED state. " +
                     "Current state is: " + state);
+            }
         }
     }
 
@@ -362,11 +356,13 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state
{@link State#CREATED CREATED}.
      */
     public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener)
{
-        if (state == State.CREATED) {
-            this.globalStateRestoreListener = globalStateRestoreListener;
-        } else {
-            throw new IllegalStateException("Can only set GlobalStateRestoreListener in CREATED
state. " +
+        synchronized (stateLock) {
+            if (state == State.CREATED) {
+                this.globalStateRestoreListener = globalStateRestoreListener;
+            } else {
+                throw new IllegalStateException("Can only set GlobalStateRestoreListener
in CREATED state. " +
                     "Current state is: " + state);
+            }
         }
     }
 
@@ -396,18 +392,21 @@ public class KafkaStreams implements AutoCloseable {
     final class StreamStateListener implements StreamThread.StateListener {
         private final Map<Long, StreamThread.State> threadState;
         private GlobalStreamThread.State globalThreadState;
+        // this lock should always be held before the state lock
+        private final Object threadStatesLock;
 
         StreamStateListener(final Map<Long, StreamThread.State> threadState,
                             final GlobalStreamThread.State globalThreadState) {
             this.threadState = threadState;
             this.globalThreadState = globalThreadState;
+            this.threadStatesLock = new Object();
         }
 
         /**
          * If all threads are dead set to ERROR
          */
         private void maybeSetError() {
-            // check if we have enough threads running
+            // check if we have at least one thread running
             for (final StreamThread.State state : threadState.values()) {
                 if (state != StreamThread.State.DEAD) {
                     return;
@@ -415,7 +414,7 @@ public class KafkaStreams implements AutoCloseable {
             }
 
             if (setState(State.ERROR)) {
-                log.warn("All stream threads have died. The instance will be in error state
and should be closed.");
+                log.error("All stream threads have died. The instance will be in error state
and should be closed.");
             }
         }
 
@@ -423,12 +422,13 @@ public class KafkaStreams implements AutoCloseable {
          * If all threads are up, including the global thread, set to RUNNING
          */
         private void maybeSetRunning() {
-            // one thread is running, check others, including global thread
+            // state can be transferred to RUNNING if all threads are either RUNNING or DEAD
             for (final StreamThread.State state : threadState.values()) {
-                if (state != StreamThread.State.RUNNING) {
+                if (state != StreamThread.State.RUNNING && state != StreamThread.State.DEAD)
{
                     return;
                 }
             }
+
             // the global state thread is relevant only if it is started. There are cases
             // when we don't have a global state thread at all, e.g., when we don't have
global KTables
             if (globalThreadState != null && globalThreadState != GlobalStreamThread.State.RUNNING)
{
@@ -443,26 +443,30 @@ public class KafkaStreams implements AutoCloseable {
         public synchronized void onChange(final Thread thread,
                                           final ThreadStateTransitionValidator abstractNewState,
                                           final ThreadStateTransitionValidator abstractOldState)
{
-            // StreamThreads first
-            if (thread instanceof StreamThread) {
-                final StreamThread.State newState = (StreamThread.State) abstractNewState;
-                threadState.put(thread.getId(), newState);
-
-                if (newState == StreamThread.State.PARTITIONS_REVOKED && state !=
State.REBALANCING) {
-                    setState(State.REBALANCING);
-                } else if (newState == StreamThread.State.RUNNING && state != State.RUNNING)
{
-                    maybeSetRunning();
-                } else if (newState == StreamThread.State.DEAD && state != State.ERROR)
{
-                    maybeSetError();
-                }
-            } else if (thread instanceof GlobalStreamThread) {
-                // global stream thread has different invariants
-                final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState;
-                globalThreadState = newState;
-
-                // special case when global thread is dead
-                if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR
&& setState(State.ERROR)) {
-                    log.warn("Global thread has died. The instance will be in error state
and should be closed.");
+            synchronized (threadStatesLock) {
+                // StreamThreads first
+                if (thread instanceof StreamThread) {
+                    final StreamThread.State newState = (StreamThread.State) abstractNewState;
+                    threadState.put(thread.getId(), newState);
+
+                    if (newState == StreamThread.State.PARTITIONS_REVOKED) {
+                        setState(State.REBALANCING);
+                    } else if (newState == StreamThread.State.RUNNING) {
+                        maybeSetRunning();
+                    } else if (newState == StreamThread.State.DEAD) {
+                        maybeSetError();
+                    }
+                } else if (thread instanceof GlobalStreamThread) {
+                    // global stream thread has different invariants
+                    final GlobalStreamThread.State newState = (GlobalStreamThread.State)
abstractNewState;
+                    globalThreadState = newState;
+
+                    // special case when global thread is dead
+                    if (newState == GlobalStreamThread.State.DEAD) {
+                        if (setState(State.ERROR)) {
+                            log.error("Global thread has died. The instance will be in error
state and should be closed.");
+                        }
+                    }
                 }
             }
         }
@@ -773,11 +777,9 @@ public class KafkaStreams implements AutoCloseable {
      *                          if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once}
is enabled for pre 0.11.0.x brokers
      */
     public synchronized void start() throws IllegalStateException, StreamsException {
-        log.debug("Starting Streams client");
+        if (setState(State.REBALANCING)) {
+            log.debug("Starting Streams client");
 
-        // first set state to RUNNING before kicking off the threads,
-        // making sure the state will always transit to RUNNING before REBALANCING
-        if (setRunningFromCreated()) {
             if (globalStreamThread != null) {
                 globalStreamThread.start();
             }
@@ -788,17 +790,13 @@ public class KafkaStreams implements AutoCloseable {
 
             final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
             stateDirCleaner.scheduleAtFixedRate(() -> {
+                // we do not use lock here since we only read on the value and act on it
                 if (state == State.RUNNING) {
                     stateDirectory.cleanRemovedTasks(cleanupDelay);
                 }
             }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
-
-            log.info("Started Streams client");
         } else {
-            // if transition failed but no exception is thrown; currently it is not possible
-            // since we do not allow calling start multiple times whether or not it is already
shutdown.
-            // TODO: In the future if we lift this restriction this code path could then
be triggered and be updated
-            log.error("Already stopped, cannot re-start");
+            throw new IllegalStateException("The client is either already started or already
stopped, cannot re-start");
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6c3e7cd..a8db7e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -83,30 +83,36 @@ public class StreamThread extends Thread {
      *          |           |
      *          |           v
      *          |     +-----+-------+
-     *          +<--- | Running (1) | <----+
+     *          +<--- | Starting (1)|
+     *          |     +-----+-------+
+     *          |           |
+     *          |           |
+     *          |           v
+     *          |     +-----+-------+
+     *          +<--- | Partitions  |
+     *          |     | Revoked (2) | <----+
      *          |     +-----+-------+      |
      *          |           |              |
      *          |           v              |
      *          |     +-----+-------+      |
-     *          +<--- | Partitions  |      |
-     *          |     | Revoked (2) | <----+
+     *          |     | Partitions  |      |
+     *          +<--- | Assigned (3)| ---->+
      *          |     +-----+-------+      |
      *          |           |              |
      *          |           v              |
      *          |     +-----+-------+      |
-     *          |     | Partitions  |      |
-     *          |     | Assigned (3)| ---->+
+     *          |     | Running (4) | ---->+
      *          |     +-----+-------+
      *          |           |
      *          |           v
      *          |     +-----+-------+
      *          +---> | Pending     |
-     *                | Shutdown (4)|
+     *                | Shutdown (5)|
      *                +-----+-------+
      *                      |
      *                      v
      *                +-----+-------+
-     *                | Dead (5)    |
+     *                | Dead (6)    |
      *                +-------------+
      * </pre>
      *
@@ -121,12 +127,13 @@ public class StreamThread extends Thread {
      *     <li>
      *         State PARTITIONS_REVOKED may want transit to itself indefinitely, in the corner
case when
      *         the coordinator repeatedly fails in-between revoking partitions and assigning
new partitions.
+     *         Also during streams instance start up PARTITIONS_REVOKED may want to transit
to itself as well.
      *         In this case we will forbid the transition but will not treat as an error.
      *     </li>
      * </ul>
      */
     public enum State implements ThreadStateTransitionValidator {
-        CREATED(1, 4), RUNNING(2, 4), PARTITIONS_REVOKED(3, 4), PARTITIONS_ASSIGNED(1, 2,
4), PENDING_SHUTDOWN(5), DEAD;
+        CREATED(1, 5), STARTING(2, 5), PARTITIONS_REVOKED(3, 5), PARTITIONS_ASSIGNED(2, 4,
5), RUNNING(2, 5), PENDING_SHUTDOWN(6), DEAD;
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -135,7 +142,7 @@ public class StreamThread extends Thread {
         }
 
         public boolean isRunning() {
-            return equals(RUNNING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
+            return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
         }
 
         @Override
@@ -738,7 +745,7 @@ public class StreamThread extends Thread {
     @Override
     public void run() {
         log.info("Starting");
-        if (setState(State.RUNNING) == null) {
+        if (setState(State.STARTING) == null) {
             log.info("StreamThread already shutdown. Not running");
             return;
         }
@@ -816,7 +823,7 @@ public class StreamThread extends Thread {
             // try to fetch some records with normal poll time
             // in order to wait long enough to get the join response
             records = pollRequests(pollTime);
-        } else if (state == State.RUNNING) {
+        } else if (state == State.RUNNING || state == State.STARTING) {
             // try to fetch some records with normal poll time
             // in order to get long polling
             records = pollRequests(pollTime);
@@ -1249,23 +1256,6 @@ public class StreamThread extends Thread {
         return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent);
     }
 
-    // the following are for testing only
-    void setNow(final long now) {
-        this.now = now;
-    }
-
-    TaskManager taskManager() {
-        return taskManager;
-    }
-
-    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords()
{
-        return standbyRecords;
-    }
-
-    int currentNumIterations() {
-        return numIterations;
-    }
-
     public Map<MetricName, Metric> producerMetrics() {
         final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
         if (producer != null) {
@@ -1300,4 +1290,25 @@ public class StreamThread extends Thread {
         result.putAll(adminClientMetrics);
         return result;
     }
+
+    // the following are for testing only
+    void setNow(final long now) {
+        this.now = now;
+    }
+
+    TaskManager taskManager() {
+        return taskManager;
+    }
+
+    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords()
{
+        return standbyRecords;
+    }
+
+    int currentNumIterations() {
+        return numIterations;
+    }
+
+    public StreamThread.StateListener stateListener() {
+        return stateListener;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index d2a4ace..89f3730 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -139,36 +139,77 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testStateChanges() throws InterruptedException {
+    public void testStateCloseAfterCreate() {
+        globalStreams.close();
+
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
+    }
+
+    @Test
+    public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedException {
         final StateListenerStub stateListener = new StateListenerStub();
         globalStreams.setStateListener(stateListener);
 
-        Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
         Assert.assertEquals(0, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
 
         globalStreams.start();
+
         TestUtils.waitForCondition(
-            () -> globalStreams.state() == KafkaStreams.State.RUNNING,
+            () -> stateListener.numChanges == 2,
             10 * 1000,
             "Streams never started.");
+        Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
 
-        globalStreams.close();
+        for (final StreamThread thread: globalStreams.threads) {
+            thread.stateListener().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_REVOKED,
+                StreamThread.State.RUNNING);
+        }
 
-        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
-    }
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
 
-    @Test
-    public void testStateCloseAfterCreate() {
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        for (final StreamThread thread: globalStreams.threads) {
+            thread.stateListener().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_ASSIGNED,
+                StreamThread.State.PARTITIONS_REVOKED);
+        }
 
-        try {
-            final StateListenerStub stateListener = new StateListenerStub();
-            streams.setStateListener(stateListener);
-        } finally {
-            streams.close();
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
+
+        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
+            globalStreams.threads[NUM_THREADS - 1],
+            StreamThread.State.PENDING_SHUTDOWN,
+            StreamThread.State.PARTITIONS_ASSIGNED);
+
+        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
+            globalStreams.threads[NUM_THREADS - 1],
+            StreamThread.State.DEAD,
+            StreamThread.State.PENDING_SHUTDOWN);
+
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
+
+        for (final StreamThread thread: globalStreams.threads) {
+            if (thread != globalStreams.threads[NUM_THREADS - 1]) {
+                thread.stateListener().onChange(
+                    thread,
+                    StreamThread.State.RUNNING,
+                    StreamThread.State.PARTITIONS_ASSIGNED);
+            }
         }
 
-        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
+        Assert.assertEquals(4, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
+
+        globalStreams.close();
+
+        Assert.assertEquals(6, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
     }
 
     @Test
@@ -499,8 +540,8 @@ public class KafkaStreamsTest {
         assertNotNull(threadMetadata);
         assertEquals(2, threadMetadata.size());
         for (final ThreadMetadata metadata : threadMetadata) {
-            assertTrue("#threadState() was: " + metadata.threadState() + "; expected either
RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
-                asList("RUNNING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()));
+            assertTrue("#threadState() was: " + metadata.threadState() + "; expected either
RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
+                asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED",
"CREATED").contains(metadata.threadState()));
             assertEquals(0, metadata.standbyTasks().size());
             assertEquals(0, metadata.activeTasks().size());
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 7503dd6..ca78d02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -82,7 +82,7 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
         TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen
thread state transited to PENDING_SHUTDOWN");
 
         streams.close();
-        assertEquals(listener.runningToRevokedSeen(), true);
+        assertEquals(listener.createdToRevokedSeen(), true);
         assertEquals(listener.revokedToPendingShutdownSeen(), true);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8a2122d..1097285 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -75,14 +75,14 @@ public class IntegrationTestUtils {
      * Records state transition for StreamThread
      */
     public static class StateListenerStub implements StreamThread.StateListener {
-        boolean runningToRevokedSeen = false;
+        boolean startingToRevokedSeen = false;
         boolean revokedToPendingShutdownSeen = false;
         @Override
         public void onChange(final Thread thread,
                              final ThreadStateTransitionValidator newState,
                              final ThreadStateTransitionValidator oldState) {
-            if (oldState == StreamThread.State.RUNNING && newState == StreamThread.State.PARTITIONS_REVOKED)
{
-                runningToRevokedSeen = true;
+            if (oldState == StreamThread.State.STARTING && newState == StreamThread.State.PARTITIONS_REVOKED)
{
+                startingToRevokedSeen = true;
             } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && newState
== StreamThread.State.PENDING_SHUTDOWN) {
                 revokedToPendingShutdownSeen = true;
             }
@@ -92,8 +92,8 @@ public class IntegrationTestUtils {
             return revokedToPendingShutdownSeen;
         }
 
-        public boolean runningToRevokedSeen() {
-            return runningToRevokedSeen;
+        public boolean createdToRevokedSeen() {
+            return startingToRevokedSeen;
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5df8fbc..dd311fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -163,12 +163,12 @@ public class StreamThreadTest {
         assertEquals(thread.state(), StreamThread.State.CREATED);
 
         final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-        thread.setState(StreamThread.State.RUNNING);
 
         final List<TopicPartition> revokedPartitions;
         final List<TopicPartition> assignedPartitions;
 
         // revoke nothing
+        thread.setState(StreamThread.State.STARTING);
         revokedPartitions = Collections.emptyList();
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
 
@@ -202,7 +202,7 @@ public class StreamThreadTest {
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
-                return thread.state() == StreamThread.State.RUNNING;
+                return thread.state() == StreamThread.State.STARTING;
             }
         }, 10 * 1000, "Thread never started.");
 
@@ -342,7 +342,7 @@ public class StreamThreadTest {
             properties));
         final StreamThread thread = createStreamThread(clientId, config, false);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
@@ -505,8 +505,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, false);
 
-        thread.setState(StreamThread.State.RUNNING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -542,7 +542,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)),
true);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -577,7 +577,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)),
true);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -612,8 +612,6 @@ public class StreamThreadTest {
     public void shouldShutdownTaskManagerOnClose() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
-        EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, StreamTask>emptyMap());
-        EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.<TaskId, StandbyTask>emptyMap());
         taskManager.shutdown(true);
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
@@ -637,7 +635,7 @@ public class StreamThreadTest {
             new StreamThread.StateListener() {
                 @Override
                 public void onChange(final Thread t, final ThreadStateTransitionValidator
newState, final ThreadStateTransitionValidator oldState) {
-                    if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.RUNNING)
{
+                    if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING)
{
                         thread.shutdown();
                     }
                 }
@@ -711,8 +709,8 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, false);
 
-        thread.setState(StreamThread.State.RUNNING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
@@ -736,7 +734,7 @@ public class StreamThreadTest {
 
         consumer.updatePartitions(topic1, singletonList(new PartitionInfo(topic1, 1, null,
null, null)));
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -804,7 +802,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -840,7 +838,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
         internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -896,8 +894,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, false);
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -946,8 +943,7 @@ public class StreamThreadTest {
         restoreConsumer.updateEndOffsets(offsets);
         restoreConsumer.updateBeginningOffsets(offsets);
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -1010,8 +1006,7 @@ public class StreamThreadTest {
             restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, i, ("K"
+ i).getBytes(), ("V" + i).getBytes()));
         }
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -1074,8 +1069,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, false);
 
-        thread.setState(StreamThread.State.RUNNING);
-
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(null);
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
@@ -1121,6 +1115,9 @@ public class StreamThreadTest {
         ThreadMetadata metadata = thread.threadMetadata();
         assertEquals(StreamThread.State.CREATED.name(), metadata.threadState());
 
+        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+        thread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
         thread.setState(StreamThread.State.RUNNING);
         metadata = thread.threadMetadata();
         assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
@@ -1155,10 +1152,9 @@ public class StreamThreadTest {
 
         clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
 
-        thread.setState(StreamThread.State.RUNNING);
-
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
+        thread.setState(StreamThread.State.STARTING);
         thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
         assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
 
@@ -1287,7 +1283,7 @@ public class StreamThreadTest {
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config),
false);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
@@ -1331,7 +1327,7 @@ public class StreamThreadTest {
         config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName());
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config),
false);
 
-        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.STARTING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);


Mime
View raw message