kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (#7021)
Date Mon, 15 Jul 2019 22:35:47 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 0f0093c  KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing
(#7021)
0f0093c is described below

commit 0f0093c1693d6ea4091228cd3a87db0219d028a2
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Mon Jul 15 15:02:13 2019 -0700

    KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (#7021)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <bruno@confluent.io>,
John Roesler <john@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  | 34 +++++++--
 .../processor/internals/StreamThreadTest.java      | 82 ++++++++++++++++++++++
 2 files changed, 109 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 419e181..4b157ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -194,14 +194,20 @@ public class StreamThread extends Thread {
             oldState = state;
 
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
+                log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: " +
+                              "only DEAD state is a valid next state", newState);
                 // when the state is already in PENDING_SHUTDOWN, all other transitions will
be
                 // refused but we do not throw exception here
                 return null;
             } else if (state == State.DEAD) {
+                log.debug("Ignoring request to transit from DEAD to {}: " +
+                              "no valid next state after DEAD", newState);
                 // when the state is already in NOT_RUNNING, all its transitions
                 // will be refused but we do not throw exception here
                 return null;
             } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED)
{
+                log.debug("Ignoring request to transit from PARTITIONS_REVOKED to PARTITIONS_REVOKED:
" +
+                              "self transition is not allowed");
                 // when the state is already in PARTITIONS_REVOKED, its transition to itself
will be
                 // refused but we do not throw exception here
                 return null;
@@ -272,17 +278,23 @@ public class StreamThread extends Thread {
             final long start = time.milliseconds();
             try {
                 if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
-                    return;
-                }
-                if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code())
{
+                    log.debug(
+                        "Skipping task creation in rebalance because we are already in {}
state.",
+                        streamThread.state()
+                    );
+                } else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code())
{
+                    log.debug(
+                        "Encountered assignment error during partition assignment: {}. Skipping
task initialization",
+                        streamThread.assignmentErrorCode
+                    );
+                } else {
+                    log.debug("Creating tasks based on assignment.");
                     taskManager.createTasks(assignment);
                 }
             } catch (final Throwable t) {
                 log.error(
                     "Error caught during partition assignment, " +
-                        "will abort the current process and re-throw at the end of rebalance:
{}",
-                    t
-                );
+                        "will abort the current process and re-throw at the end of rebalance",
t);
                 streamThread.setRebalanceException(t);
             } finally {
                 log.info("partition assignment took {} ms.\n" +
@@ -833,7 +845,6 @@ public class StreamThread extends Thread {
     // Visible for testing
     void runOnce() {
         final ConsumerRecords<byte[], byte[]> records;
-
         now = time.milliseconds();
 
         if (state == State.PARTITIONS_ASSIGNED) {
@@ -854,6 +865,15 @@ public class StreamThread extends Thread {
             throw new StreamsException(logPrefix + "Unexpected state " + state + " during
normal iteration");
         }
 
+        // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN
during #pollRequests().
+        // The task manager internal states could be uninitialized if the state transition
happens during #onPartitionsAssigned().
+        // Should only proceed when the thread is still running after #pollRequests(), because
no external state mutation
+        // could affect the task manager state beyond this point within #runOnce().
+        if (!isRunning()) {
+            log.debug("State already transits to {}, skipping the run once call after poll
request", state);
+            return;
+        }
+
         final long pollLatency = advanceNowAndComputeLatency();
 
         if (records != null && !records.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f99bb89..7a4e33b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -20,8 +20,10 @@ import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -73,6 +75,7 @@ import org.junit.Test;
 import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -665,6 +668,85 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldNotThrowWhenPendingShutdownInRunOnce() {
+        mockRunOnce(true);
+    }
+
+    @Test
+    public void shouldNotThrowWithoutPendingShutdownInRunOnce() {
+        // A reference test to verify that without intermediate shutdown the runOnce should
pass
+        // without any exception.
+        mockRunOnce(false);
+    }
+
+    private void mockRunOnce(final boolean shutdownOnPoll) {
+        final Collection<TopicPartition> assignedPartitions = Collections.singletonList(t1p1);
+        class MockStreamThreadConsumer<K, V> extends MockConsumer<K, V> {
+
+            private StreamThread streamThread;
+
+            private MockStreamThreadConsumer(final OffsetResetStrategy offsetResetStrategy)
{
+                super(offsetResetStrategy);
+            }
+
+            @Override
+            public synchronized ConsumerRecords<K, V> poll(final Duration timeout)
{
+                assertNotNull(streamThread);
+                if (shutdownOnPoll) {
+                    streamThread.shutdown();
+                }
+                streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+                return super.poll(timeout);
+            }
+
+            private void setStreamThread(final StreamThread streamThread) {
+                this.streamThread = streamThread;
+            }
+        }
+
+        final MockStreamThreadConsumer<byte[], byte[]> mockStreamThreadConsumer =
+            new MockStreamThreadConsumer<>(OffsetResetStrategy.EARLIEST);
+
+        final TaskManager taskManager = new TaskManager(new MockChangelogReader(),
+                                                        processId,
+                                                        "log-prefix",
+                                                        mockStreamThreadConsumer,
+                                                        streamsMetadataState,
+                                                        null,
+                                                        null,
+                                                        null,
+                                                        new AssignedStreamsTasks(new LogContext()),
+                                                        new AssignedStandbyTasks(new LogContext()));
+        taskManager.setConsumer(mockStreamThreadConsumer);
+        taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
clientId);
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            mockStreamThreadConsumer,
+            mockStreamThreadConsumer,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            clientId,
+            new LogContext(""),
+            new AtomicInteger()
+        ).updateThreadMetadata(getSharedAdminClientId(clientId));
+
+        mockStreamThreadConsumer.setStreamThread(thread);
+        mockStreamThreadConsumer.assign(assignedPartitions);
+        mockStreamThreadConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+        addRecord(mockStreamThreadConsumer, 1L, 0L);
+        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+        thread.runOnce();
+    }
+
+    @Test
     public void shouldOnlyShutdownOnce() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);


Mime
View raw message