kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Streams fix state transition stuck on rebalance
Date Thu, 15 Dec 2016 17:44:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 704993816 -> ea724497a


HOTFIX: Streams fix state transition stuck on rebalance

This fixes a problem where the Kafka instance state transition gets stuck on rebalance (Thanks
to dguy for pointing out). Also adjusts the test in QueryableStateIntegration test.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #2252 from enothereska/hotfix_state_never_running


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea724497
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea724497
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea724497

Branch: refs/heads/trunk
Commit: ea724497a85e922af6acccd02d7e7466871fb339
Parents: 7049938
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Dec 15 09:44:09 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Dec 15 09:44:09 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 24 +++++++++++++----
 .../processor/internals/StreamThread.java       |  7 +++--
 .../apache/kafka/streams/KafkaStreamsTest.java  |  8 ++++++
 .../QueryableStateIntegrationTest.java          | 27 ++++++++++++++------
 .../processor/internals/StreamThreadTest.java   |  2 +-
 5 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ea724497/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index dc6907f..548e594 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -42,7 +42,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.Set;
@@ -95,6 +97,7 @@ public class KafkaStreams {
     private static final String JMX_PREFIX = "kafka.streams";
     public static final int DEFAULT_CLOSE_TIMEOUT = 0;
     private final StreamThread[] threads;
+    private final Map<Long, StreamThread.State> threadState;
     private final Metrics metrics;
     private final QueryableStoreProvider queryableStoreProvider;
 
@@ -204,10 +207,18 @@ public class KafkaStreams {
 
     private class StreamStateListener implements StreamThread.StateListener {
         @Override
-        public void onChange(final StreamThread.State newState, final StreamThread.State
oldState) {
+        public synchronized void onChange(final StreamThread thread, final StreamThread.State
newState, final StreamThread.State oldState) {
+            threadState.put(thread.getId(), newState);
             if (newState == StreamThread.State.PARTITIONS_REVOKED ||
                 newState == StreamThread.State.ASSIGNING_PARTITIONS) {
-                setState(KafkaStreams.State.REBALANCING);
+                setState(State.REBALANCING);
+            } else if (newState == StreamThread.State.RUNNING) {
+                for (StreamThread.State state : threadState.values()) {
+                    if (state != StreamThread.State.RUNNING) {
+                        return;
+                    }
+                }
+                setState(State.RUNNING);
             }
         }
     }
@@ -267,6 +278,7 @@ public class KafkaStreams {
         metrics = new Metrics(metricConfig, reporters, time);
 
         threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threadState = new HashMap<>(threads.length);
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         streamsMetadataState = new StreamsMetadataState(builder);
         for (int i = 0; i < threads.length; i++) {
@@ -280,6 +292,7 @@ public class KafkaStreams {
                 time,
                 streamsMetadataState);
             threads[i].setStateListener(streamStateListener);
+            threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
         queryableStoreProvider = new QueryableStoreProvider(storeProviders);
@@ -294,11 +307,12 @@ public class KafkaStreams {
         log.debug("Starting Kafka Stream process");
 
         if (state == KafkaStreams.State.CREATED) {
-            for (final StreamThread thread : threads)
-                thread.start();
-
             setState(KafkaStreams.State.RUNNING);
 
+            for (final StreamThread thread : threads) {
+                thread.start();
+            }
+
             log.info("Started Kafka Stream process");
         } else {
             throw new IllegalStateException("Cannot start again.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea724497/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a7793f8..cfbd3a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -134,10 +134,11 @@ public class StreamThread extends Thread {
 
         /**
          * Called when state changes
+         * @param thread       thread changing state
          * @param newState     current state
          * @param oldState     previous state
          */
-        void onChange(final State newState, final State oldState);
+        void onChange(final StreamThread thread, final State newState, final State oldState);
     }
 
     /**
@@ -164,9 +165,7 @@ public class StreamThread extends Thread {
         }
         state = newState;
         if (stateListener != null) {
-            synchronized (stateListener) {
-                stateListener.onChange(state, oldState);
-            }
+            stateListener.onChange(this, state, oldState);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea724497/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index e8b46cc..37809bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -31,6 +31,8 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -69,6 +71,7 @@ public class KafkaStreamsTest {
         Assert.assertEquals(stateListener.numChanges, 1);
         Assert.assertEquals(stateListener.oldState, KafkaStreams.State.CREATED);
         Assert.assertEquals(stateListener.newState, KafkaStreams.State.RUNNING);
+        Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.RUNNING).longValue(),
1L);
 
         final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int initCountDifference = newInitCount - oldInitCount;
@@ -78,6 +81,8 @@ public class KafkaStreamsTest {
         Assert.assertEquals("each reporter initialized should also be closed",
             oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
         Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+        Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.RUNNING).longValue(),
1L);
+        Assert.assertEquals(stateListener.mapStates.get(KafkaStreams.State.NOT_RUNNING).longValue(),
1L);
     }
 
     @Test
@@ -262,12 +267,15 @@ public class KafkaStreamsTest {
         public int numChanges = 0;
         public KafkaStreams.State oldState;
         public KafkaStreams.State newState;
+        public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();
 
         @Override
         public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
+            Long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState)
: 0;
             this.numChanges++;
             this.oldState = oldState;
             this.newState = newState;
+            this.mapStates.put(newState, prevCount + 1);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea724497/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 02c9fc1..6dc1782 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreamsTest;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -228,11 +229,13 @@ public class QueryableStateIntegrationTest {
     private class StreamRunnable implements Runnable {
         private final KafkaStreams myStream;
         private boolean closed = false;
+        private KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub();
 
         StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort)
{
             final Properties props = (Properties) streamsConfiguration.clone();
             props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort);
             myStream = createCountStream(inputTopic, outputTopic, props);
+            myStream.setStateListener(stateListener);
         }
 
         @Override
@@ -255,9 +258,14 @@ public class QueryableStateIntegrationTest {
         public final KafkaStreams getStream() {
             return myStream;
         }
+
+        public final KafkaStreamsTest.StateListenerStub getStateListener() {
+            return stateListener;
+        }
     }
 
     private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams
streams,
+                                 final KafkaStreamsTest.StateListenerStub stateListenerStub,
                                  final Set<String> keys, final String storeName) throws
Exception {
         for (final String key : keys) {
             TestUtils.waitForCondition(new TestCondition() {
@@ -276,8 +284,8 @@ public class QueryableStateIntegrationTest {
                         // Kafka Streams instance may have closed but rebalance hasn't happened
                         return false;
                     } catch (final InvalidStateStoreException e) {
-                        // rebalance
-                        assertEquals(streams.state(), KafkaStreams.State.REBALANCING);
+                        // there must have been at least one rebalance state
+                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING)
>= 1);
                         return false;
                     }
 
@@ -288,6 +296,7 @@ public class QueryableStateIntegrationTest {
 
 
     private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams
streams,
+                                       final KafkaStreamsTest.StateListenerStub stateListenerStub,
                                        final Set<String> keys, final String storeName,
                                        final Long from, final Long to) throws Exception {
         for (final String key : keys) {
@@ -307,8 +316,8 @@ public class QueryableStateIntegrationTest {
                         // Kafka Streams instance may have closed but rebalance hasn't happened
                         return false;
                     } catch (InvalidStateStoreException e) {
-                        // rebalance
-                        assertEquals(streams.state(), KafkaStreams.State.REBALANCING);
+                        // there must have been at least one rebalance state
+                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING)
>= 1);
                         return false;
                     }
 
@@ -341,10 +350,11 @@ public class QueryableStateIntegrationTest {
             waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
 
             for (int i = 0; i < numThreads; i++) {
-                verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys,
+                verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(),
inputValuesKeys,
                     "word-count-store-" + streamThree);
-                verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys,
+                verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(),
inputValuesKeys,
                                       "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
+                assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
             }
 
             // kill N-1 threads
@@ -355,10 +365,11 @@ public class QueryableStateIntegrationTest {
             }
 
             // query from the remaining thread
-            verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys,
+            verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(),
inputValuesKeys,
                 "word-count-store-" + streamThree);
-            verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys,
+            verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(),
inputValuesKeys,
                                   "windowed-word-count-store-" + streamThree, 0L, WINDOW_SIZE);
+            assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
         } finally {
             for (int i = 0; i < numThreads; i++) {
                 if (!streamRunnables[i].isClosed()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea724497/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 0c2ace9..13678a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -611,7 +611,7 @@ public class StreamThreadTest {
         public StreamThread.State newState = null;
 
         @Override
-        public void onChange(final StreamThread.State newState, final StreamThread.State
oldState) {
+        public void onChange(final StreamThread thread, final StreamThread.State newState,
final StreamThread.State oldState) {
             this.numChanges++;
             if (this.newState != null) {
                 if (this.newState != oldState) {


Mime
View raw message