kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: HOTFIX: state transition cherry picking for 0.10.2
Date Thu, 17 Aug 2017 07:54:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 991ef411d -> 517bcc491


HOTFIX: state transition cherry picking for 0.10.2

- Cherry picked from #3432
- Minor checkstyle fixes to get to build

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3623 from enothereska/KAFKA-5571-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/517bcc49
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/517bcc49
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/517bcc49

Branch: refs/heads/0.10.2
Commit: 517bcc49161b81521819c718616279a243814458
Parents: 991ef41
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Aug 17 08:54:41 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Aug 17 08:54:41 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 356 +++++++++++++------
 .../org/apache/kafka/streams/StreamsConfig.java |   6 +-
 .../kstream/internals/SessionKeySerde.java      |   6 +-
 .../streams/processor/TopologyBuilder.java      |   2 +-
 .../processor/internals/GlobalStreamThread.java | 131 ++++++-
 .../processor/internals/StreamThread.java       | 153 +++++---
 .../ThreadStateTransitionValidator.java         |  24 ++
 .../apache/kafka/streams/KafkaStreamsTest.java  | 173 ++++++++-
 .../KStreamAggregationDedupIntegrationTest.java |  12 +-
 .../KStreamAggregationIntegrationTest.java      |  24 +-
 .../internals/GlobalStreamThreadTest.java       |  47 ++-
 .../processor/internals/StreamThreadTest.java   | 116 ++++--
 .../internals/StreamsMetadataStateTest.java     |   2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 +-
 14 files changed, 818 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index cd0e861..af29d12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -47,6 +47,7 @@ import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.StreamsMetadata;
@@ -74,6 +75,10 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
+import static org.apache.kafka.streams.KafkaStreams.State.NOT_RUNNING;
+import static org.apache.kafka.streams.KafkaStreams.State.PENDING_SHUTDOWN;
+import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
 
 /**
  * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -150,9 +155,9 @@ public class KafkaStreams {
      *         |       +-----+--------+
      *         |             |
      *         |             v
-     *         |       +-----+--------+
-     *         +<----- | Rebalancing  | <----+
-     *         |       +--------------+      |
+     *         |       +-----+--------+ <-+
+     *         +<----- | Rebalancing  | --+
+     *         |       +--------------+ <----+
      *         |                             |
      *         |                             |
      *         |       +--------------+      |
@@ -161,18 +166,31 @@ public class KafkaStreams {
      *         |             |
      *         |             v
      *         |       +-----+--------+
-     *         +-----> | Pending      |
-     *                 | Shutdown     |
-     *                 +-----+--------+
-     *                       |
-     *                       v
-     *                 +-----+--------+
-     *                 | Not Running  |
+     *         +-----> | Pending      |<----+
+     *         |       | Shutdown     |     |
+     *         |       +-----+--------+     |
+     *         |             |              |
+     *         |             v              |
+     *         |       +-----+--------+     |
+     *         |       | Not Running  |     |
+     *         |       +--------------+     |
+     *         |                            |
+     *         |       +--------------+     |
+     *         +-----> | Error        |-----+
      *                 +--------------+
+     *
+     *
      * </pre>
+     * Note the following:
+     * - Any state can go to PENDING_SHUTDOWN and subsequently NOT_RUNNING.
+     * - It is theoretically possible for a thread to always be in the PARTITION_REVOKED state
+     * (see {@code StreamThread} state diagram) and hence it is possible that this instance is always
+     * on a REBALANCING state.
+     * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
+     * the instance will be in the ERROR state. The user will need to close it.
      */
     public enum State {
-        CREATED(1, 2, 3), RUNNING(2, 3), REBALANCING(1, 2, 3), PENDING_SHUTDOWN(4), NOT_RUNNING;
+        CREATED(1, 2, 3, 5), REBALANCING(1, 2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -190,10 +208,10 @@ public class KafkaStreams {
             return validTransitions.contains(newState.ordinal());
         }
     }
+    private final Object stateLock = new Object();
     private volatile State state = State.CREATED;
     private StateListener stateListener = null;
 
-
     /**
      * Listen to {@link State} change events.
      */
@@ -216,28 +234,55 @@ public class KafkaStreams {
         stateListener = listener;
     }
 
-    private synchronized void setState(final State newState) {
-        final State oldState = state;
-        if (!state.isValidTransition(newState)) {
-            log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
-        } else {
-            log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
-        }
-
-        state = newState;
+    /**
+     * Sets the state
+     * @param newState New state
+     * @return true if state is set, false otherwise
+     * @throws StreamsException when there is an unexpected transition.
+     */
+    private boolean setState(final State newState) {
+        State oldState;
+        synchronized (stateLock) {
+            // there are cases when we shouldn't check if a transition is valid, e.g.,
+            // when, for testing, Kafka Streams is closed multiple times. We could either
+            // check here and immediately return for those cases, or add them to the transition
+            // diagram (but then the diagram would be confusing and have transitions like
+            // NOT_RUNNING->NOT_RUNNING). These cases include:
+            // - calling close() multiple times. Would mean going from NOT_RUNNING -> PENDING_SHUTDOWN
+            // - calling start() after close(). Would mean going from PENDING_SHUTDOWN (or NOT_RUNNING) -> RUNNING
+
+            // note we could be going from PENDING_SHUTDOWN to NOT_RUNNING, and we obviously want to allow that
+            // transition, hence the check newState != NOT_RUNNING.
+            if (newState != NOT_RUNNING &&
+                    (state == State.NOT_RUNNING || state == PENDING_SHUTDOWN)) {
+                return false;
+            }
 
+            oldState = state;
+            if (!state.isValidTransition(newState)) {
+                log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
+                throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+            } else {
+                log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
+            }
+            state = newState;
+        }
         if (stateListener != null) {
             stateListener.onChange(state, oldState);
         }
+
+        return true;
     }
 
     /**
      * Return the current {@link State} of this {@code KafkaStreams} instance.
      *
-     * @return the currnt state of this Kafka Streams instance
+     * @return the current state of this Kafka Streams instance
      */
-    public synchronized State state() {
-        return state;
+    public State state() {
+        synchronized (stateLock) {
+            return state;
+        }
     }
 
     /**
@@ -249,22 +294,103 @@ public class KafkaStreams {
         return Collections.unmodifiableMap(metrics.metrics());
     }
 
-    private class StreamStateListener implements StreamThread.StateListener {
-        @Override
-        public synchronized void onChange(final StreamThread thread,
-                                          final StreamThread.State newState,
-                                          final StreamThread.State oldState) {
-            threadState.put(thread.getId(), newState);
-            if (newState == StreamThread.State.PARTITIONS_REVOKED ||
-                newState == StreamThread.State.ASSIGNING_PARTITIONS) {
-                setState(State.REBALANCING);
-            } else if (newState == StreamThread.State.RUNNING) {
-                for (final StreamThread.State state : threadState.values()) {
-                    if (state != StreamThread.State.RUNNING) {
-                        return;
+    /**
+     * Class that handles stream thread transitions
+     */
+    final class StreamStateListener implements StreamThread.StateListener {
+        private final Map<Long, StreamThread.State> threadState;
+        private GlobalStreamThread.State globalThreadState;
+
+        StreamStateListener(final Map<Long, StreamThread.State> threadState,
+                            final GlobalStreamThread.State globalThreadState) {
+            this.threadState = threadState;
+            this.globalThreadState = globalThreadState;
+        }
+
+        /**
+         * If all threads are dead set to ERROR
+         */
+        private void checkAllThreadsDeadAndSetError() {
+
+            synchronized (stateLock) {
+                // if we are pending a shutdown, it's ok for all threads to die, in fact
+                // it is expected. Otherwise, it is an error
+                if (state != PENDING_SHUTDOWN) {
+                    // one thread died, check if we have enough threads running
+                    for (final StreamThread.State state : threadState.values()) {
+                        if (state != StreamThread.State.DEAD) {
+                            return;
+                        }
                     }
+                    log.warn("{} All stream threads have died. The Kafka Streams instance will be in an error state and should be closed.",
+                            logPrefix);
+                    setState(ERROR);
+                }
+            }
+        }
+
+        /**
+         * If all global thread is DEAD
+         */
+        private void maybeSetErrorSinceGlobalStreamThreadIsDead() {
+
+            synchronized (stateLock) {
+                // if we are pending a shutdown, it's ok for all threads to die, in fact
+                // it is expected. Otherwise, it is an error
+                if (state != PENDING_SHUTDOWN) {
+                    log.warn("{} Global Stream thread has died. The Kafka Streams instance will be in an error state and should be closed.",
+                            logPrefix);
+                    setState(ERROR);
+                }
+            }
+        }
+
+        /**
+         * If all threads are up, including the global thread, set to RUNNING
+         */
+        private void maybeSetRunning() {
+            // one thread is running, check others, including global thread
+            for (final StreamThread.State state : threadState.values()) {
+                if (state != StreamThread.State.RUNNING) {
+                    return;
+                }
+            }
+            // the global state thread is relevant only if it is started. There are cases
+            // when we don't have a global state thread at all, e.g., when we don't have global KTables
+            if (globalThreadState != null && globalThreadState != GlobalStreamThread.State.RUNNING) {
+                return;
+            }
+
+            setState(State.RUNNING);
+        }
+
+
+        @Override
+        public synchronized void onChange(final Thread thread,
+                                          final ThreadStateTransitionValidator abstractNewState,
+                                          final ThreadStateTransitionValidator abstractOldState) {
+            // StreamThreads first
+            if (thread instanceof StreamThread) {
+                StreamThread.State newState = (StreamThread.State) abstractNewState;
+                threadState.put(thread.getId(), newState);
+
+                if (newState == StreamThread.State.PARTITIONS_REVOKED ||
+                        newState == StreamThread.State.ASSIGNING_PARTITIONS) {
+                    setState(State.REBALANCING);
+                } else if (newState == StreamThread.State.RUNNING && state() != State.RUNNING) {
+                    maybeSetRunning();
+                } else if (newState == StreamThread.State.DEAD) {
+                    checkAllThreadsDeadAndSetError();
+                }
+            } else if (thread instanceof GlobalStreamThread) {
+                // global stream thread has different invariants
+                GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState;
+                globalThreadState = newState;
+
+                // special case when global thread is dead
+                if (newState == GlobalStreamThread.State.DEAD) {
+                    maybeSetErrorSinceGlobalStreamThreadIsDead();
                 }
-                setState(State.RUNNING);
             }
         }
     }
@@ -331,6 +457,7 @@ public class KafkaStreams {
 
         threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         threadState = new HashMap<>(threads.length);
+        GlobalStreamThread.State globalThreadState = null;
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
@@ -352,6 +479,7 @@ public class KafkaStreams {
                                                         metrics,
                                                         time,
                                                         clientId);
+            globalThreadState = globalStreamThread.state();
         }
 
         for (int i = 0; i < threads.length; i++) {
@@ -365,10 +493,16 @@ public class KafkaStreams {
                                           time,
                                           streamsMetadataState,
                                           cacheSizeBytes);
-            threads[i].setStateListener(new StreamStateListener());
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
+        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        if (globalTaskTopology != null) {
+            globalStreamThread.setStateListener(streamStateListener);
+        }
+        for (int i = 0; i < threads.length; i++) {
+            threads[i].setStateListener(streamStateListener);
+        }
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
     }
@@ -405,7 +539,17 @@ public class KafkaStreams {
         } catch (final IOException e) {
             log.warn("{} Could not close StreamKafkaClient.", logPrefix, e);
         }
+    }
 
+    private void validateStartOnce() {
+        try {
+            if (setState(RUNNING)) {
+                return;
+            }
+        } catch (StreamsException e) {
+            // do nothing, will throw
+        }
+        throw new IllegalStateException("Cannot start again.");
     }
 
     /**
@@ -420,23 +564,18 @@ public class KafkaStreams {
      */
     public synchronized void start() throws IllegalStateException, StreamsException {
         log.debug("{} Starting Kafka Stream process.", logPrefix);
+        validateStartOnce();
+        checkBrokerVersionCompatibility();
 
-        if (state == State.CREATED) {
-            checkBrokerVersionCompatibility();
-            setState(State.RUNNING);
-
-            if (globalStreamThread != null) {
-                globalStreamThread.start();
-            }
-
-            for (final StreamThread thread : threads) {
-                thread.start();
-            }
+        if (globalStreamThread != null) {
+            globalStreamThread.start();
+        }
 
-            log.info("{} Started Kafka Stream process", logPrefix);
-        } else {
-            throw new IllegalStateException("Cannot start again.");
+        for (final StreamThread thread : threads) {
+            thread.start();
         }
+
+        log.info("{} Started Kafka Stream process", logPrefix);
     }
 
     /**
@@ -447,6 +586,25 @@ public class KafkaStreams {
         close(DEFAULT_CLOSE_TIMEOUT, TimeUnit.SECONDS);
     }
 
+    private boolean checkFirstTimeClosing() {
+        return setState(PENDING_SHUTDOWN);
+    }
+
+    private void closeGlobalStreamThread() {
+        if (globalStreamThread != null) {
+            globalStreamThread.setStateListener(null);
+            globalStreamThread.close();
+            if (!globalStreamThread.stillRunning()) {
+                try {
+                    globalStreamThread.join();
+                } catch (final InterruptedException e) {
+                    Thread.interrupted();
+                }
+            }
+            globalStreamThread = null;
+        }
+    }
+
     /**
      * Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
      * threads to join.
@@ -456,58 +614,52 @@ public class KafkaStreams {
      * @param timeUnit unit of time used for timeout
      * @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
      * before all threads stopped
+     * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
      */
     public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
         log.debug("{} Stopping Kafka Stream process.", logPrefix);
-        if (state.isCreatedOrRunning()) {
-            setState(State.PENDING_SHUTDOWN);
-            // save the current thread so that if it is a stream thread
-            // we don't attempt to join it and cause a deadlock
-            final Thread shutdown = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    // signal the threads to stop and wait
-                    for (final StreamThread thread : threads) {
-                        // avoid deadlocks by stopping any further state reports
-                        // from the thread since we're shutting down
-                        thread.setStateListener(null);
-                        thread.close();
-                    }
-                    if (globalStreamThread != null) {
-                        globalStreamThread.close();
-                        if (!globalStreamThread.stillRunning()) {
-                            try {
-                                globalStreamThread.join();
-                            } catch (final InterruptedException e) {
-                                Thread.interrupted();
-                            }
-                        }
-                    }
-                    for (final StreamThread thread : threads) {
-                        try {
-                            if (!thread.stillRunning()) {
-                                thread.join();
-                            }
-                        } catch (final InterruptedException ex) {
-                            Thread.interrupted();
+
+        // only clean up once
+        if (!checkFirstTimeClosing()) {
+            return true;
+        }
+
+        // save the current thread so that if it is a stream thread
+        // we don't attempt to join it and cause a deadlock
+        final Thread shutdown = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                // signal the threads to stop and wait
+                for (final StreamThread thread : threads) {
+                    // avoid deadlocks by stopping any further state reports
+                    // from the thread since we're shutting down
+                    thread.setStateListener(null);
+                    thread.close();
+                }
+                closeGlobalStreamThread();
+                for (final StreamThread thread : threads) {
+                    try {
+                        if (!thread.stillRunning()) {
+                            thread.join();
                         }
+                    } catch (final InterruptedException ex) {
+                        Thread.interrupted();
                     }
-
-                    metrics.close();
-                    log.info("{} Stopped Kafka Streams process.", logPrefix);
                 }
-            }, "kafka-streams-close-thread");
-            shutdown.setDaemon(true);
-            shutdown.start();
-            try {
-                shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
-            } catch (final InterruptedException e) {
-                Thread.interrupted();
+
+                metrics.close();
+                log.info("{} Stopped Kafka Streams process.", logPrefix);
             }
-            setState(State.NOT_RUNNING);
-            return !shutdown.isAlive();
+        }, "kafka-streams-close-thread");
+        shutdown.setDaemon(true);
+        shutdown.start();
+        try {
+            shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
+        } catch (final InterruptedException e) {
+            Thread.interrupted();
         }
-        return true;
+        setState(State.NOT_RUNNING);
+        return !shutdown.isAlive();
     }
 
     /**
@@ -544,6 +696,12 @@ public class KafkaStreams {
         return sb.toString();
     }
 
+    private boolean isRunning() {
+        synchronized (stateLock) {
+            return state.isRunning();
+        }
+    }
+
     /**
      * Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
      * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.
@@ -556,7 +714,7 @@ public class KafkaStreams {
      * @throws IllegalStateException if the instance is currently running
      */
     public void cleanUp() {
-        if (state.isRunning()) {
+        if (isRunning()) {
             throw new IllegalStateException("Cannot clean up while running.");
         }
 
@@ -710,7 +868,7 @@ public class KafkaStreams {
     }
 
     private void validateIsRunning() {
-        if (!state.isRunning()) {
+        if (!isRunning()) {
             throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d9c5f26..0724571 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -389,8 +389,7 @@ public class StreamsConfig extends AbstractConfig {
     // this is the list of configs for underlying clients
     // that streams prefer different default values
     private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
         tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 10);
@@ -399,8 +398,7 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index 6909773..b05ea96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -112,11 +112,11 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
     }
 
 
-    public static long extractEnd(final byte [] binaryKey) {
+    public static long extractEnd(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
     }
 
-    public static long extractStart(final byte [] binaryKey) {
+    public static long extractStart(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
     }
 
@@ -156,7 +156,7 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>> {
         return new Bytes(buf.array());
     }
 
-    public static Window extractWindow(final byte [] binaryKey) {
+    public static Window extractWindow(final byte[] binaryKey) {
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 92cfda7..9a25b16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -299,7 +299,7 @@ public class TopologyBuilder {
      * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
      */
     public enum AutoOffsetReset {
-        EARLIEST , LATEST
+        EARLIEST, LATEST
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index b4e15f2..7435a66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -31,8 +31,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Collections;
+import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
+import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
 
 /**
  * This is the thread responsible for keeping all Global State Stores updated.
@@ -48,9 +53,112 @@ public class GlobalStreamThread extends Thread {
     private final ThreadCache cache;
     private final StreamsMetrics streamsMetrics;
     private final ProcessorTopology topology;
-    private volatile boolean running = false;
     private volatile StreamsException startupException;
 
+    /**
+     * The states that the global stream thread can be in
+     *
+     * <pre>
+     *                +-------------+
+     *          +<--- | Created     |
+     *          |     +-----+-------+
+     *          |           |
+     *          |           v
+     *          |     +-----+-------+
+     *          +<--- | Running     |
+     *          |     +-----+-------+
+     *          |           |
+     *          |           v
+     *          |     +-----+-------+
+     *          +---> | Pending     |
+     *                | Shutdown    |
+     *                +-----+-------+
+     *                      |
+     *                      v
+     *                +-----+-------+
+     *                | Dead        |
+     *                +-------------+
+     * </pre>
+     *
+     * Note the following:
+     * - Any state can go to PENDING_SHUTDOWN and subsequently to DEAD
+     *
+     */
+    public enum State implements ThreadStateTransitionValidator {
+        CREATED(1, 2), RUNNING(2), PENDING_SHUTDOWN(3), DEAD;
+
+        private final Set<Integer> validTransitions = new HashSet<>();
+
+        State(final Integer... validTransitions) {
+            this.validTransitions.addAll(Arrays.asList(validTransitions));
+        }
+
+        public boolean isRunning() {
+            return !equals(PENDING_SHUTDOWN) && !equals(CREATED) && !equals(DEAD);
+        }
+
+        public boolean isValidTransition(final ThreadStateTransitionValidator newState) {
+            State tmpState = (State) newState;
+            return validTransitions.contains(tmpState.ordinal());
+        }
+    }
+
+    private volatile State state = State.CREATED;
+    private final Object stateLock = new Object();
+    private StreamThread.StateListener stateListener = null;
+    private final String logPrefix;
+
+
+    /**
+     * Set the {@link StreamThread.StateListener} to be notified when state changes. Note this API is internal to
+     * Kafka Streams and is not intended to be used by an external application.
+     */
+    public void setStateListener(final StreamThread.StateListener listener) {
+        stateListener = listener;
+    }
+
+    /**
+     * @return The state this instance is in
+     */
+    public State state() {
+        synchronized (stateLock) {
+            return state;
+        }
+    }
+
+    /**
+     * Sets the state
+     * @param newState New state
+     * @param ignoreWhenShuttingDownOrDead,       if true, then we'll first check if the state is
+     *                                            PENDING_SHUTDOWN or DEAD, and if it is,
+     *                                            we immediately return. Effectively this enables
+     *                                            a conditional set, under the stateLock lock.
+     */
+    void setState(final State newState, boolean ignoreWhenShuttingDownOrDead) {
+        State oldState;
+        synchronized (stateLock) {
+            oldState = state;
+
+            if (ignoreWhenShuttingDownOrDead) {
+                if (state == PENDING_SHUTDOWN || state == DEAD) {
+                    return;
+                }
+            }
+
+            if (!state.isValidTransition(newState)) {
+                log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
+                throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+            } else {
+                log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
+            }
+
+            state = newState;
+        }
+        if (stateListener != null) {
+            stateListener.onChange(this, state, oldState);
+        }
+    }
+
     public GlobalStreamThread(final ProcessorTopology topology,
                               final StreamsConfig config,
                               final Consumer<byte[], byte[]> globalConsumer,
@@ -70,6 +178,7 @@ public class GlobalStreamThread extends Thread {
         final String threadClientId = clientId + "-" + getName();
         this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
+        this.logPrefix = String.format("global-stream-thread [%s]", threadClientId);
     }
 
     static class StateConsumer {
@@ -134,15 +243,19 @@ public class GlobalStreamThread extends Thread {
         if (stateConsumer == null) {
             return;
         }
+        // one could kill the thread before it had a chance to actually start
+        setState(State.RUNNING, true);
 
         try {
-            while (running) {
+            while (stillRunning()) {
                 stateConsumer.pollAndUpdate();
             }
             log.debug("Shutting down GlobalStreamThread at user request");
         } finally {
             try {
+                setState(PENDING_SHUTDOWN, true);
                 stateConsumer.close();
+                setState(DEAD, false);
             } catch (IOException e) {
                 log.error("Failed to cleanly shutdown GlobalStreamThread", e);
             }
@@ -165,7 +278,6 @@ public class GlobalStreamThread extends Thread {
                                         config.getLong(StreamsConfig.POLL_MS_CONFIG),
                                         config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
             stateConsumer.initialize();
-            running = true;
             return stateConsumer;
         } catch (StreamsException e) {
             startupException = e;
@@ -178,7 +290,7 @@ public class GlobalStreamThread extends Thread {
     @Override
     public synchronized void start() {
         super.start();
-        while (!running) {
+        while (!stillRunning()) {
             Utils.sleep(1);
             if (startupException != null) {
                 throw startupException;
@@ -186,13 +298,16 @@ public class GlobalStreamThread extends Thread {
         }
     }
 
-
     public void close() {
-        running = false;
+        // one could call close() multiple times, so ignore subsequent calls
+        // if already shutting down or dead
+        setState(PENDING_SHUTDOWN, true);
     }
 
     public boolean stillRunning() {
-        return running;
+        synchronized (stateLock) {
+            return state.isRunning();
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 77cb632..35cecb0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -78,35 +78,46 @@ public class StreamThread extends Thread {
      *
      * <pre>
      *                +-------------+
-     *                | Not Running | <-------+
-     *                +-----+-------+         |
-     *                      |                 |
-     *                      v                 |
-     *                +-----+-------+         |
-     *          +<--- | Running     | <----+  |
-     *          |     +-----+-------+      |  |
-     *          |           |              |  |
-     *          |           v              |  |
-     *          |     +-----+-------+      |  |
-     *          +<--- | Partitions  |      |  |
-     *          |     | Revoked     |      |  |
-     *          |     +-----+-------+      |  |
-     *          |           |              |  |
-     *          |           v              |  |
-     *          |     +-----+-------+      |  |
-     *          |     | Assigning   |      |  |
-     *          |     | Partitions  | ---->+  |
-     *          |     +-----+-------+         |
-     *          |           |                 |
-     *          |           v                 |
-     *          |     +-----+-------+         |
-     *          +---> | Pending     | ------->+
+     *          +<--- | Created     |
+     *          |     +-----+-------+
+     *          |           |
+     *          |           v
+     *          |     +-----+-------+
+     *          +<--- | Running     | <----+
+     *          |     +-----+-------+      |
+     *          |           |              |
+     *          |           v              |
+     *          |     +-----+-------+      |
+     *          +<--- | Partitions  | <-+  |
+     *          |     | Revoked     | --+  |
+     *          |     +-----+-------+      |
+     *          |           |              |
+     *          |           v              |
+     *          |     +-----+-------+      |
+     *          +<--- | Assigning   |      |
+     *          |     | Partitions  | ---->+
+     *          |     +-----+-------+
+     *          |           |
+     *          |           v
+     *          |     +-----+-------+
+     *          +---> | Pending     |
      *                | Shutdown    |
+     *                +-----+-------+
+     *                      |
+     *                      v
+     *                +-----+-------+
+     *                | Dead        |
      *                +-------------+
      * </pre>
+     *
+     * Note the following:
+     * - Any state can go to PENDING_SHUTDOWN followed by a subsequent transition to DEAD.
+     * - A streams thread can stay in PARTITIONS_REVOKED indefinitely, in the corner case when
+     *   the coordinator repeatedly fails in-between revoking partitions and assigning new partitions.
+     *
      */
-    public enum State {
-        NOT_RUNNING(1), RUNNING(1, 2, 4), PARTITIONS_REVOKED(3, 4), ASSIGNING_PARTITIONS(1, 4), PENDING_SHUTDOWN(0);
+    public enum State implements ThreadStateTransitionValidator {
+        CREATED(1, 4), RUNNING(2, 4), PARTITIONS_REVOKED(2, 3, 4), ASSIGNING_PARTITIONS(1, 4), PENDING_SHUTDOWN(5), DEAD;
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -115,15 +126,18 @@ public class StreamThread extends Thread {
         }
 
         public boolean isRunning() {
-            return !this.equals(PENDING_SHUTDOWN) && !this.equals(NOT_RUNNING);
+            return !equals(PENDING_SHUTDOWN) && !equals(CREATED) && !equals(DEAD);
         }
 
-        public boolean isValidTransition(final State newState) {
-            return validTransitions.contains(newState.ordinal());
+        @Override
+        public boolean isValidTransition(final ThreadStateTransitionValidator newState) {
+            State tmpState = (State) newState;
+            return validTransitions.contains(tmpState.ordinal());
         }
     }
 
-    private volatile State state = State.NOT_RUNNING;
+    private volatile State state = State.CREATED;
+    private final Object stateLock = new Object();
     private StateListener stateListener = null;
 
     /**
@@ -137,7 +151,7 @@ public class StreamThread extends Thread {
          * @param newState     current state
          * @param oldState     previous state
          */
-        void onChange(final StreamThread thread, final State newState, final State oldState);
+        void onChange(final Thread thread, final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState);
     }
 
     /**
@@ -151,31 +165,50 @@ public class StreamThread extends Thread {
     /**
      * @return The state this instance is in
      */
-    public synchronized State state() {
-        return state;
+    public State state() {
+        synchronized (stateLock) {
+            return state;
+        }
     }
 
-    private synchronized void setState(State newState) {
-        State oldState = state;
-        if (!state.isValidTransition(newState)) {
-            log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
-        } else {
-            log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
-        }
 
-        state = newState;
+    /**
+     * Sets the state
+     * @param newState New state
+     */
+    void setState(final State newState) {
+        State oldState;
+        synchronized (stateLock) {
+            oldState = state;
+
+            // there are cases when we shouldn't check if a transition is valid, e.g.,
+            // when, for testing, a thread is closed multiple times. We could either
+            // check here and immediately return for those cases, or add them to the transition
+            // diagram (but then the diagram would be confusing and have transitions like
+            // PENDING_SHUTDOWN->PENDING_SHUTDOWN). These cases include:
+            // - normal close() sequence. State is set to PENDING_SHUTDOWN in close() as well as in shutdown().
+            // - calling close() on the thread after an exception within the thread has already called shutdown().
+
+            // note we could be going from PENDING_SHUTDOWN to DEAD, and we obviously want to allow that
+            // transition, hence the check newState != DEAD.
+            if (newState != State.DEAD &&
+                    (state == State.PENDING_SHUTDOWN || state == State.DEAD)) {
+                return;
+            }
+            if (!state.isValidTransition(newState)) {
+                log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
+                throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+            } else {
+                log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
+            }
+
+            state = newState;
+        }
         if (stateListener != null) {
             stateListener.onChange(this, state, oldState);
         }
     }
 
-    private synchronized void setStateWhenNotInPendingShutdown(final State newState) {
-        if (state == State.PENDING_SHUTDOWN) {
-            return;
-        }
-        setState(newState);
-    }
-
     public final PartitionGrouper partitionGrouper;
     private final StreamsMetadataState streamsMetadataState;
     public final String applicationId;
@@ -226,7 +259,7 @@ public class StreamThread extends Thread {
 
             try {
                 log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.", logPrefix, state, assignment);
-                setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
+                setState(State.ASSIGNING_PARTITIONS);
                 // do this first as we may have suspended standby tasks that
                 // will become active or vice versa
                 closeNonAssignedSuspendedStandbyTasks();
@@ -235,7 +268,7 @@ public class StreamThread extends Thread {
                 addStandbyTasks();
                 lastCleanMs = time.milliseconds(); // start the cleaning cycle
                 streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
-                setStateWhenNotInPendingShutdown(State.RUNNING);
+                setState(State.RUNNING);
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -246,7 +279,7 @@ public class StreamThread extends Thread {
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             try {
                 log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.", logPrefix, state, assignment);
-                setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
+                setState(State.PARTITIONS_REVOKED);
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
                 // suspend active tasks
                 suspendTasksAndState();
@@ -261,8 +294,10 @@ public class StreamThread extends Thread {
         }
     };
 
-    public synchronized boolean isInitialized() {
-        return state == State.RUNNING;
+    public boolean isInitialized() {
+        synchronized (stateLock) {
+            return state == State.RUNNING;
+        }
     }
 
     public String threadClientId() {
@@ -340,7 +375,6 @@ public class StreamThread extends Thread {
         this.timerStartedMs = time.milliseconds();
         this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
         this.lastCommitMs = timerStartedMs;
-        setState(State.RUNNING);
     }
 
     public void partitionAssignor(StreamPartitionAssignor partitionAssignor) {
@@ -356,7 +390,7 @@ public class StreamThread extends Thread {
     @Override
     public void run() {
         log.info("{} Starting", logPrefix);
-
+        setState(State.RUNNING);
         try {
             runLoop();
             cleanRun = true;
@@ -375,6 +409,8 @@ public class StreamThread extends Thread {
 
     /**
      * Shutdown this stream thread.
+     * Note that there is nothing to prevent this function from being called multiple times
+     * (e.g., in testing), hence the state is set only the first time
      */
     public synchronized void close() {
         log.info("{} Informed thread to shut down", logPrefix);
@@ -388,6 +424,7 @@ public class StreamThread extends Thread {
 
     private void shutdown() {
         log.info("{} Shutting down", logPrefix);
+        setState(State.PENDING_SHUTDOWN);
         shutdownTasksAndState();
 
         // close all embedded clients
@@ -418,7 +455,7 @@ public class StreamThread extends Thread {
         // clean up global tasks
 
         log.info("{} Stream thread shutdown complete", logPrefix);
-        setState(State.NOT_RUNNING);
+        setState(State.DEAD);
         streamsMetrics.removeAllSensors();
     }
 
@@ -740,8 +777,10 @@ public class StreamThread extends Thread {
         }
     }
 
-    public synchronized boolean stillRunning() {
-        return state.isRunning();
+    public boolean stillRunning() {
+        synchronized (stateLock) {
+            return state.isRunning();
+        }
     }
 
     private void maybePunctuate(StreamTask task) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadStateTransitionValidator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadStateTransitionValidator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadStateTransitionValidator.java
new file mode 100644
index 0000000..f60c4de
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadStateTransitionValidator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+/**
+ * Basic interface for keeping track of the state of a thread.
+ */
+public interface ThreadStateTransitionValidator {
+    boolean isValidTransition(final ThreadStateTransitionValidator newState);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 5dae8dd..50ab117 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -29,7 +29,10 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -51,6 +54,7 @@ import static org.junit.Assert.assertTrue;
 public class KafkaStreamsTest {
 
     private static final int NUM_BROKERS = 1;
+    private static final int NUM_THREADS = 2;
     // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
     // quick enough)
     @ClassRule
@@ -66,17 +70,14 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
         streams = new KafkaStreams(builder, props);
     }
 
     @Test
-    public void testInitializesAndDestroysMetricsReporters() throws Exception {
-        final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
+    public void testStateChanges() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
-        final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final int initDiff = newInitCount - oldInitCount;
-        assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
 
         StateListenerStub stateListener = new StateListenerStub();
         streams.setStateListener(stateListener);
@@ -84,17 +85,135 @@ public class KafkaStreamsTest {
         Assert.assertEquals(stateListener.numChanges, 0);
 
         streams.start();
-        Assert.assertEquals(streams.state(), KafkaStreams.State.RUNNING);
-        Assert.assertEquals(stateListener.numChanges, 1);
-        Assert.assertEquals(stateListener.oldState, KafkaStreams.State.CREATED);
-        Assert.assertEquals(stateListener.newState, KafkaStreams.State.RUNNING);
-        Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.RUNNING).longValue(), 1L);
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.RUNNING;
+            }
+        }, 10 * 1000, "Streams never started.");
+        streams.close();
+        Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+    }
+
+    @Test
+    public void testStateCloseAfterCreate() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+        StateListenerStub stateListener = new StateListenerStub();
+        streams.setStateListener(stateListener);
+        streams.close();
+        Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+    }
+
+    private void testStateThreadCloseHelper(final int numThreads) throws Exception {
+        final java.lang.reflect.Field threadsField = streams.getClass().getDeclaredField("threads");
+        threadsField.setAccessible(true);
+        final StreamThread[] threads = (StreamThread[]) threadsField.get(streams);
+
+        assertEquals(numThreads, threads.length);
+        assertEquals(streams.state(), KafkaStreams.State.CREATED);
+
+        streams.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.RUNNING;
+            }
+        }, 10 * 1000, "Streams never started.");
+
+        for (int i = 0; i < numThreads; i++) {
+            final StreamThread tmpThread = threads[i];
+            tmpThread.close();
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return tmpThread.state() == StreamThread.State.DEAD;
+                }
+            }, 10 * 1000, "Thread never stopped.");
+            threads[i].join();
+        }
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.ERROR;
+            }
+        }, 10 * 1000, "Streams never stopped.");
+        streams.close();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.NOT_RUNNING;
+            }
+        }, 10 * 1000, "Streams never stopped.");
+
+        final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
+        globalThreadField.setAccessible(true);
+        GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
+        assertEquals(globalStreamThread, null);
+    }
+
+    @Test
+    public void testStateThreadClose() throws Exception {
+        final int numThreads = 2;
+        final KStreamBuilder builder = new KStreamBuilder();
+        // make sure we have the global state thread running too
+        builder.globalTable("anyTopic", "anyStore");
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+        testStateThreadCloseHelper(numThreads);
+    }
+    
+    @Test
+    public void testStateGlobalThreadClose() throws Exception {
+        final int numThreads = 2;
+        final KStreamBuilder builder = new KStreamBuilder();
+        // make sure we have the global state thread running too
+        builder.globalTable("anyTopic", "anyStoreName");
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+
+        streams.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.RUNNING;
+            }
+        }, 10 * 1000, "Streams never started.");
+        final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
+        globalThreadField.setAccessible(true);
+        final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams);
+        globalStreamThread.close();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return globalStreamThread.state() == GlobalStreamThread.State.DEAD;
+            }
+        }, 10 * 1000, "Thread never stopped.");
+        globalStreamThread.join();
+        assertEquals(streams.state(), KafkaStreams.State.ERROR);
+
+        streams.close();
+        assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+
+    }
+
+
+    @Test
+    public void testInitializesAndDestroysMetricsReporters() throws Exception {
+        final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
+        final int initDiff = newInitCount - oldInitCount;
+        assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
+
+        streams.start();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         streams.close();
         assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
-        Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
-        Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.RUNNING).longValue(), 1L);
-        Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.NOT_RUNNING).longValue(), 1L);
     }
 
     @Test
@@ -277,6 +396,12 @@ public class KafkaStreamsTest {
         final KafkaStreams streams = new KafkaStreams(builder, props);
 
         streams.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return streams.state() == KafkaStreams.State.RUNNING;
+            }
+        }, 10 * 1000, "Streams never started.");
         try {
             streams.cleanUp();
         } catch (final IllegalStateException e) {
@@ -293,6 +418,23 @@ public class KafkaStreamsTest {
         public KafkaStreams.State oldState;
         public KafkaStreams.State newState;
         public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();
+        private final boolean closeOnChange;
+        private final KafkaStreams streams;
+
+        public StateListenerStub() {
+            this.closeOnChange = false;
+            this.streams = null;
+        }
+
+        /**
+         * For testing only, we might want to test closing streams on a transition change
+         * @param closeOnChange
+         * @param streams
+         */
+        public StateListenerStub(final boolean closeOnChange, final KafkaStreams streams) {
+            this.closeOnChange = closeOnChange;
+            this.streams = streams;
+        }
 
         @Override
         public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
@@ -301,6 +443,11 @@ public class KafkaStreamsTest {
             this.oldState = oldState;
             this.newState = newState;
             this.mapStates.put(newState, prevCount + 1);
+            if (this.closeOnChange &&
+                    (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR)) {
+                streams.close();
+            }
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 5e7d02b..6a8c7ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -126,8 +126,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 5);
+            new StringDeserializer(),
+                5);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -176,8 +176,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+                10);
 
         Comparator<KeyValue<String, String>>
             comparator =
@@ -228,8 +228,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 5);
+            new LongDeserializer(),
+                5);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index ce30212..bd5911d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -154,8 +154,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+                10);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -208,8 +208,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 15);
+            new StringDeserializer(),
+                15);
 
         final Comparator<KeyValue<String, String>>
             comparator =
@@ -262,8 +262,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 10);
+            new IntegerDeserializer(),
+                10);
 
         Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
             @Override
@@ -312,8 +312,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 15);
+            new IntegerDeserializer(),
+                15);
 
         final Comparator<KeyValue<String, Integer>>
             comparator =
@@ -363,8 +363,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+                10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
@@ -405,8 +405,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+                10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 67138f7..29e1a32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -28,16 +28,18 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
-
+import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
 
 public class GlobalStreamThreadTest {
     private final KStreamBuilder builder = new KStreamBuilder();
@@ -75,7 +77,7 @@ public class GlobalStreamThreadTest {
 
 
     @Test
-    public void shouldBeRunningAfterSuccesulStart() throws Exception {
+    public void shouldBeRunningAfterSuccessfulStart() throws Exception {
         initializeConsumer();
         globalStreamThread.start();
         assertTrue(globalStreamThread.stillRunning());
@@ -87,6 +89,7 @@ public class GlobalStreamThreadTest {
         globalStreamThread.start();
         globalStreamThread.close();
         globalStreamThread.join();
+        assertEquals(GlobalStreamThread.State.DEAD, globalStreamThread.state());
     }
 
     @Test
@@ -100,6 +103,46 @@ public class GlobalStreamThreadTest {
         assertFalse(globalStore.isOpen());
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldTransitionToDeadOnClose() throws InterruptedException {
+
+        initializeConsumer();
+        globalStreamThread.start();
+        globalStreamThread.close();
+        globalStreamThread.join();
+
+        assertEquals(GlobalStreamThread.State.DEAD, globalStreamThread.state());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldStayDeadAfterTwoCloses() throws InterruptedException {
+
+        initializeConsumer();
+        globalStreamThread.start();
+        globalStreamThread.close();
+        globalStreamThread.join();
+        globalStreamThread.close();
+
+        assertEquals(GlobalStreamThread.State.DEAD, globalStreamThread.state());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldTransitiontoRunningOnStart() throws InterruptedException {
+
+        initializeConsumer();
+        globalStreamThread.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return globalStreamThread.state() == RUNNING;
+            }
+        }, 10 * 1000, "Thread never started.");
+        globalStreamThread.close();
+    }
+
     private void initializeConsumer() {
         mockConsumer.updatePartitions("foo", Collections.singletonList(new PartitionInfo("foo",
                                                                                          0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c2f62d6..33d62f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -75,7 +76,11 @@ public class StreamThreadTest {
 
     private final String clientId = "clientId";
     private final String applicationId = "stream-thread-test";
+    private final Metrics metrics = new Metrics();
+    private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private UUID processId = UUID.randomUUID();
+    final KStreamBuilder builder = new KStreamBuilder();
+    private final StreamsConfig config = new StreamsConfig(configProps());
 
     @Before
     public void setUp() throws Exception {
@@ -219,11 +224,11 @@ public class StreamThreadTest {
         };
 
         thread.setStateListener(stateListener);
-        assertEquals(thread.state(), StreamThread.State.RUNNING);
+        assertEquals(thread.state(), StreamThread.State.CREATED);
         initPartitionGrouper(config, thread, mockClientSupplier);
 
         ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
+        thread.setState(StreamThread.State.RUNNING);
         assertTrue(thread.tasks().isEmpty());
 
         List<TopicPartition> revokedPartitions;
@@ -237,14 +242,10 @@ public class StreamThreadTest {
 
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
         assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED);
-        Assert.assertEquals(stateListener.numChanges, 1);
-        Assert.assertEquals(stateListener.oldState, StreamThread.State.RUNNING);
-        Assert.assertEquals(stateListener.newState, StreamThread.State.PARTITIONS_REVOKED);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
-        Assert.assertEquals(stateListener.numChanges, 3);
+        Assert.assertEquals(stateListener.numChanges, 4);
         Assert.assertEquals(stateListener.oldState, StreamThread.State.ASSIGNING_PARTITIONS);
-        Assert.assertEquals(stateListener.newState, StreamThread.State.RUNNING);
 
         assertTrue(thread.tasks().containsKey(task1));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
@@ -328,10 +329,45 @@ public class StreamThreadTest {
         assertTrue(thread.tasks().isEmpty());
 
         thread.close();
-        assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN) ||
-            (thread.state() == StreamThread.State.NOT_RUNNING));
+        assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testStateChangeStartClose() throws InterruptedException {
+
+        final StreamThread thread = new StreamThread(
+                builder,
+                config,
+                clientSupplier,
+                applicationId,
+                clientId,
+                processId,
+                metrics,
+                Time.SYSTEM,
+                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                0);
+
+        final StateListenerStub stateListener = new StateListenerStub();
+        thread.setStateListener(stateListener);
+        thread.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread.state() == StreamThread.State.RUNNING;
+            }
+        }, 10 * 1000, "Thread never started.");
+        thread.close();
+        assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN);
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread.state() == StreamThread.State.DEAD;
+            }
+        }, 10 * 1000, "Thread never shut down.");
+        thread.close();
+        assertEquals(thread.state(), StreamThread.State.DEAD);
+    }
     final static String TOPIC = "topic";
     final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0));
     final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1));
@@ -347,12 +383,10 @@ public class StreamThreadTest {
                 .persistent()
                 .build()
         );
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
+        builder.addSource("source", TOPIC);
 
-        final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
-        final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread thread1 = new StreamThread(builder, config, clientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread thread2 = new StreamThread(builder, config, clientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
         final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
@@ -363,6 +397,22 @@ public class StreamThreadTest {
         thread1.partitionAssignor(new MockStreamsPartitionAssignor(thread1Assignment));
         thread2.partitionAssignor(new MockStreamsPartitionAssignor(thread2Assignment));
 
+        thread1.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread1.state() == StreamThread.State.RUNNING;
+            }
+        }, 10 * 1000, "Thread never started.");
+
+        thread2.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread2.state() == StreamThread.State.RUNNING;
+            }
+        }, 10 * 1000, "Thread never started.");
+
         // revoke (to get threads in correct state)
         thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
         thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
@@ -437,7 +487,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void testMetrics() throws Exception {
+    public void testMetrics() {
         TopologyBuilder builder = new TopologyBuilder().setApplicationId("MetricsApp");
         StreamsConfig config = new StreamsConfig(configProps());
         MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -546,7 +596,7 @@ public class StreamThreadTest {
             revokedPartitions = Collections.emptyList();
             assignedPartitions = Arrays.asList(t1p1, t1p2);
             prevTasks = new HashMap<>(thread.tasks());
-
+            thread.setState(StreamThread.State.RUNNING);
             rebalanceListener.onPartitionsRevoked(revokedPartitions);
             rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
@@ -580,7 +630,6 @@ public class StreamThreadTest {
             revokedPartitions = assignedPartitions;
             assignedPartitions = Collections.emptyList();
             prevTasks = new HashMap<>(thread.tasks());
-
             rebalanceListener.onPartitionsRevoked(revokedPartitions);
             rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
@@ -660,7 +709,8 @@ public class StreamThreadTest {
             //
             revokedPartitions = Collections.emptyList();
             assignedPartitions = Arrays.asList(t1p1, t1p2);
-
+            thread.setState(StreamThread.State.RUNNING);
+            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             rebalanceListener.onPartitionsRevoked(revokedPartitions);
             rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
@@ -733,6 +783,7 @@ public class StreamThreadTest {
             }
         });
 
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
     }
@@ -774,6 +825,7 @@ public class StreamThreadTest {
             }
         });
 
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
 
@@ -840,6 +892,7 @@ public class StreamThreadTest {
             }
         });
 
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2));
 
@@ -890,6 +943,7 @@ public class StreamThreadTest {
         });
 
         // should create task for id 0_0 with a single partition
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
 
@@ -947,7 +1001,7 @@ public class StreamThreadTest {
 
 
         thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
 
@@ -998,7 +1052,7 @@ public class StreamThreadTest {
 
 
         thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
         // store should have been opened
@@ -1051,7 +1105,7 @@ public class StreamThreadTest {
 
 
         thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
         try {
@@ -1102,7 +1156,7 @@ public class StreamThreadTest {
 
 
         thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
         try {
@@ -1154,6 +1208,8 @@ public class StreamThreadTest {
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         activeTasks.put(testStreamTask.id, testStreamTask.partitions);
         thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
@@ -1198,8 +1254,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
             clientId, processId, new Metrics(), new MockTime(),
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0)
-        {
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
             @Override
             protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return new StandbyTask(
@@ -1224,6 +1279,8 @@ public class StreamThreadTest {
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
         standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0)));
         thread.partitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks));
+        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
         thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
@@ -1253,13 +1310,14 @@ public class StreamThreadTest {
         partitionAssignor.onAssignment(assignments.get("client"));
     }
 
-    public static class StateListenerStub implements StreamThread.StateListener {
-        public int numChanges = 0;
-        public StreamThread.State oldState = null;
-        public StreamThread.State newState = null;
+    private static class StateListenerStub implements StreamThread.StateListener {
+        int numChanges = 0;
+        ThreadStateTransitionValidator oldState = null;
+        ThreadStateTransitionValidator newState = null;
 
         @Override
-        public void onChange(final StreamThread thread, final StreamThread.State newState, final StreamThread.State oldState) {
+        public void onChange(final Thread thread, final ThreadStateTransitionValidator newState,
+                             final ThreadStateTransitionValidator oldState) {
             this.numChanges++;
             if (this.newState != null) {
                 if (this.newState != oldState) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 5bdc4f1..76647d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -62,7 +62,7 @@ public class StreamsMetadataStateTest {
     private TopicPartition topic4P0;
     private List<PartitionInfo> partitionInfos;
     private Cluster cluster;
-    private final String globalTable = "global-table";;
+    private final String globalTable = "global-table";
     private StreamPartitioner<String, Object> partitioner;
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/517bcc49/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 7ace43a..b4cbf6f 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -194,7 +194,7 @@ public class ProcessorTopologyTestDriver {
             final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
             for (final String topicName : globalTopology.sourceTopics()) {
                 List<PartitionInfo> partitionInfos = new ArrayList<>();
-                partitionInfos.add(new PartitionInfo(topicName , 1, null, null, null));
+                partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
                 globalConsumer.updatePartitions(topicName, partitionInfos);
                 final TopicPartition partition = new TopicPartition(topicName, 1);
                 globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));


Mime
View raw message