kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From boy...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10321: fix infinite blocking for global stream thread startup (#9095)
Date Thu, 30 Jul 2020 04:05:22 GMT
This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f64f6d  KAFKA-10321: fix infinite blocking for global stream thread startup (#9095)
2f64f6d is described below

commit 2f64f6deb906cdfe4d006e530edeff2e79c05f76
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Wed Jul 29 21:04:21 2020 -0700

    KAFKA-10321: fix infinite blocking for global stream thread startup (#9095)
    
    The start() function for global stream thread only checks whether the thread is not running,
as it needs to block until it finishes the initialization. This PR fixes this behavior by
adding a check whether the thread is already in error state as well.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <vvcephei@apache.org>
---
 .../processor/internals/GlobalStreamThread.java    | 26 ++++++++++++++++++++--
 .../internals/GlobalStreamThreadTest.java          | 23 ++++++++++++-------
 2 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 14d1ef8..236940ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -42,8 +42,10 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
+import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
 
 /**
  * This is the thread responsible for keeping all Global State Stores updated.
@@ -106,6 +108,10 @@ public class GlobalStreamThread extends Thread {
             return equals(RUNNING);
         }
 
+        public boolean inErrorState() {
+            return equals(DEAD) || equals(PENDING_SHUTDOWN);
+        }
+
         @Override
         public boolean isValidTransition(final ThreadStateTransitionValidator newState) {
             final State tmpState = (State) newState;
@@ -173,6 +179,18 @@ public class GlobalStreamThread extends Thread {
         }
     }
 
+    public boolean inErrorState() {
+        synchronized (stateLock) {
+            return state.inErrorState();
+        }
+    }
+
+    public boolean stillInitializing() {
+        synchronized (stateLock) {
+            return state.equals(CREATED);
+        }
+    }
+
     public GlobalStreamThread(final ProcessorTopology topology,
                               final StreamsConfig config,
                               final Consumer<byte[], byte[]> globalConsumer,
@@ -276,7 +294,7 @@ public class GlobalStreamThread extends Thread {
 
             return;
         }
-        setState(State.RUNNING);
+        setState(RUNNING);
 
         boolean wipeStateStore = false;
         try {
@@ -384,12 +402,16 @@ public class GlobalStreamThread extends Thread {
     @Override
     public synchronized void start() {
         super.start();
-        while (!stillRunning()) {
+        while (stillInitializing()) {
             Utils.sleep(1);
             if (startupException != null) {
                 throw startupException;
             }
         }
+
+        if (inErrorState()) {
+            throw new IllegalStateException("Initialization for the global stream thread
failed");
+        }
     }
 
     public void shutdown() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 15bae37..1e31357 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -163,14 +163,14 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldBeRunningAfterSuccessfulStart() {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         assertTrue(globalStreamThread.stillRunning());
     }
 
     @Test(timeout = 30000)
     public void shouldStopRunningWhenClosedByUser() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         globalStreamThread.shutdown();
         globalStreamThread.join();
         assertEquals(GlobalStreamThread.State.DEAD, globalStreamThread.state());
@@ -179,7 +179,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldCloseStateStoresOnClose() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         final StateStore globalStore = builder.globalStateStores().get(GLOBAL_STORE_NAME);
         assertTrue(globalStore.isOpen());
         globalStreamThread.shutdown();
@@ -190,7 +190,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldTransitionToDeadOnClose() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         globalStreamThread.shutdown();
         globalStreamThread.join();
 
@@ -200,7 +200,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldStayDeadAfterTwoCloses() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
         globalStreamThread.shutdown();
         globalStreamThread.join();
         globalStreamThread.shutdown();
@@ -211,7 +211,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldTransitionToRunningOnStart() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
 
         TestUtils.waitForCondition(
             () -> globalStreamThread.state() == RUNNING,
@@ -231,7 +231,7 @@ public class GlobalStreamThreadTest {
             }
         });
 
-        globalStreamThread.start();
+        startAndSwallowError();
 
         TestUtils.waitForCondition(
             () -> globalStreamThread.state() == DEAD,
@@ -245,7 +245,7 @@ public class GlobalStreamThreadTest {
     @Test
     public void shouldDieOnInvalidOffsetExceptionWhileRunning() throws Exception {
         initializeConsumer();
-        globalStreamThread.start();
+        startAndSwallowError();
 
         TestUtils.waitForCondition(
             () -> globalStreamThread.state() == RUNNING,
@@ -289,4 +289,11 @@ public class GlobalStreamThreadTest {
         mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
         mockConsumer.assign(Collections.singleton(topicPartition));
     }
+
+    private void startAndSwallowError() {
+        try {
+            globalStreamThread.start();
+        } catch (final IllegalStateException ignored) {
+        }
+    }
 }


Mime
View raw message