kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8062: Do not remore StateListener when shutting down stream thread (#6468)
Date Tue, 19 Mar 2019 14:22:25 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 8e734fc  KAFKA-8062: Do not remore StateListener when shutting down stream thread
(#6468)
8e734fc is described below

commit 8e734fcb5c3eb232db08f1986f3613799fa57fb2
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Mar 19 07:12:49 2019 -0700

    KAFKA-8062: Do not remore StateListener when shutting down stream thread (#6468)
    
    In a previous commit #6091, we've fixed a couple of edge cases and hence do not need to
remove state listener anymore (before that we removed the state listener intentionally to
avoid some race conditions, which has been gone for now).
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>,   Bill Bejeck <bbejeck@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  4 +-
 .../streams/processor/internals/StreamThread.java  |  1 -
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 72 ++++++++++++++++++++--
 3 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2d1aa79..3c8ee16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -191,7 +191,7 @@ public class KafkaStreams implements AutoCloseable {
      *   the instance will be in the ERROR state. The user will need to close it.
      */
     public enum State {
-        CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING,
ERROR(3, 5);
+        CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING,
ERROR(3);
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
@@ -857,7 +857,6 @@ public class KafkaStreams implements AutoCloseable {
                 // notify all the threads to stop; avoid deadlocks by stopping any
                 // further state reports from the thread since we're shutting down
                 for (final StreamThread thread : threads) {
-                    thread.setStateListener(null);
                     thread.shutdown();
                 }
 
@@ -872,7 +871,6 @@ public class KafkaStreams implements AutoCloseable {
                 }
 
                 if (globalStreamThread != null) {
-                    globalStreamThread.setStateListener(null);
                     globalStreamThread.shutdown();
                 }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 71df0f9..1bd09e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -269,7 +269,6 @@ public class StreamThread extends Thread {
             if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
{
                 log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
                 streamThread.shutdown();
-                streamThread.setStateListener(null);
                 return;
             }
             final long start = time.milliseconds();
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 6b8b5b5..3e55f29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -140,14 +140,14 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testStateCloseAfterCreate() {
+    public void stateShouldTransitToNotRunningIfCloseRightAfterCreated() {
         globalStreams.close();
 
         Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
     }
 
     @Test
-    public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedException {
+    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException
{
         final StateListenerStub stateListener = new StateListenerStub();
         globalStreams.setStateListener(stateListener);
 
@@ -171,7 +171,7 @@ public class KafkaStreamsTest {
         Assert.assertEquals(3, stateListener.numChanges);
         Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
 
-        for (final StreamThread thread: globalStreams.threads) {
+        for (final StreamThread thread : globalStreams.threads) {
             thread.stateListener().onChange(
                 thread,
                 StreamThread.State.PARTITIONS_ASSIGNED,
@@ -194,7 +194,7 @@ public class KafkaStreamsTest {
         Assert.assertEquals(3, stateListener.numChanges);
         Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
 
-        for (final StreamThread thread: globalStreams.threads) {
+        for (final StreamThread thread : globalStreams.threads) {
             if (thread != globalStreams.threads[NUM_THREADS - 1]) {
                 thread.stateListener().onChange(
                     thread,
@@ -215,6 +215,70 @@ public class KafkaStreamsTest {
     }
 
     @Test
+    public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
+        final StateListenerStub stateListener = new StateListenerStub();
+        globalStreams.setStateListener(stateListener);
+
+        Assert.assertEquals(0, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state());
+
+        globalStreams.start();
+
+        TestUtils.waitForCondition(
+            () -> stateListener.numChanges == 2,
+            "Streams never started.");
+        Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
+
+        for (final StreamThread thread : globalStreams.threads) {
+            thread.stateListener().onChange(
+                thread,
+                StreamThread.State.PARTITIONS_REVOKED,
+                StreamThread.State.RUNNING);
+        }
+
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
+
+        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
+            globalStreams.threads[NUM_THREADS - 1],
+            StreamThread.State.PENDING_SHUTDOWN,
+            StreamThread.State.PARTITIONS_REVOKED);
+
+        globalStreams.threads[NUM_THREADS - 1].stateListener().onChange(
+            globalStreams.threads[NUM_THREADS - 1],
+            StreamThread.State.DEAD,
+            StreamThread.State.PENDING_SHUTDOWN);
+
+        Assert.assertEquals(3, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state());
+
+        for (final StreamThread thread : globalStreams.threads) {
+            if (thread != globalStreams.threads[NUM_THREADS - 1]) {
+                thread.stateListener().onChange(
+                    thread,
+                    StreamThread.State.PENDING_SHUTDOWN,
+                    StreamThread.State.PARTITIONS_REVOKED);
+
+                thread.stateListener().onChange(
+                    thread,
+                    StreamThread.State.DEAD,
+                    StreamThread.State.PENDING_SHUTDOWN);
+            }
+        }
+
+        Assert.assertEquals(4, stateListener.numChanges);
+        Assert.assertEquals(KafkaStreams.State.ERROR, globalStreams.state());
+
+        globalStreams.close();
+
+        // the state should not stuck with ERROR, but transit to NOT_RUNNING in the end
+        TestUtils.waitForCondition(
+            () -> stateListener.numChanges == 6,
+            "Streams never closed.");
+        Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
+    }
+
+    @Test
     public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
         builder.globalTable("anyTopic");
         final List<Node> nodes = Collections.singletonList(new Node(0, "localhost",
8121));


Mime
View raw message