kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (#7021)
Date Mon, 15 Jul 2019 23:12:37 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 8fb9eb8  KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing
(#7021)
8fb9eb8 is described below

commit 8fb9eb87944ff67d898532840047477f6c747393
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Mon Jul 15 15:02:13 2019 -0700

    KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (#7021)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <bruno@confluent.io>,
John Roesler <john@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  | 34 ++++++--
 .../processor/internals/StreamThreadTest.java      | 94 +++++++++++++++++++++-
 2 files changed, 120 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3a19aa7..db6760c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -189,14 +189,20 @@ public class StreamThread extends Thread {
             oldState = state;
 
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
+                log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: " +
+                              "only DEAD state is a valid next state", newState);
                 // when the state is already in PENDING_SHUTDOWN, all other transitions will
be
                 // refused but we do not throw exception here
                 return null;
             } else if (state == State.DEAD) {
+                log.debug("Ignoring request to transit from DEAD to {}: " +
+                              "no valid next state after DEAD", newState);
                 // when the state is already in NOT_RUNNING, all its transitions
                 // will be refused but we do not throw exception here
                 return null;
             } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED)
{
+                log.debug("Ignoring request to transit from PARTITIONS_REVOKED to PARTITIONS_REVOKED:
" +
+                              "self transition is not allowed");
                 // when the state is already in PARTITIONS_REVOKED, its transition to itself
will be
                 // refused but we do not throw exception here
                 return null;
@@ -268,17 +274,23 @@ public class StreamThread extends Thread {
             final long start = time.milliseconds();
             try {
                 if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
-                    return;
-                }
-                if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code())
{
+                    log.debug(
+                        "Skipping task creation in rebalance because we are already in {}
state.",
+                        streamThread.state()
+                    );
+                } else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code())
{
+                    log.debug(
+                        "Encountered assignment error during partition assignment: {}. Skipping
task initialization",
+                        streamThread.assignmentErrorCode
+                    );
+                } else {
+                    log.debug("Creating tasks based on assignment.");
                     taskManager.createTasks(assignment);
                 }
             } catch (final Throwable t) {
                 log.error(
                     "Error caught during partition assignment, " +
-                        "will abort the current process and re-throw at the end of rebalance:
{}",
-                    t
-                );
+                        "will abort the current process and re-throw at the end of rebalance",
t);
                 streamThread.setRebalanceException(t);
             } finally {
                 log.info("partition assignment took {} ms.\n" +
@@ -805,7 +817,6 @@ public class StreamThread extends Thread {
     // Visible for testing
     void runOnce() {
         final ConsumerRecords<byte[], byte[]> records;
-
         now = time.milliseconds();
 
         if (state == State.PARTITIONS_ASSIGNED) {
@@ -826,6 +837,15 @@ public class StreamThread extends Thread {
             throw new StreamsException(logPrefix + "Unexpected state " + state + " during
normal iteration");
         }
 
+        // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN
during #pollRequests().
+        // The task manager internal states could be uninitialized if the state transition
happens during #onPartitionsAssigned().
+        // Should only proceed when the thread is still running after #pollRequests(), because
no external state mutation
+        // could affect the task manager state beyond this point within #runOnce().
+        if (!isRunning()) {
+            log.debug("State already transits to {}, skipping the run once call after poll
request", state);
+            return;
+        }
+
         final long pollLatency = advanceNowAndComputeLatency();
 
         if (records != null && !records.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e9d413f..9246ac5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -16,13 +16,14 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.time.Duration;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -76,8 +77,10 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -676,6 +679,95 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void shouldNotThrowWhenPendingShutdownInRunOnce() {
+        mockRunOnce(true);
+    }
+
+    @Test
+    public void shouldNotThrowWithoutPendingShutdownInRunOnce() {
+        // A reference test to verify that without intermediate shutdown the runOnce should
pass
+        // without any exception.
+        mockRunOnce(false);
+    }
+
+    private void mockRunOnce(final boolean shutdownOnPoll) {
+        final Collection<TopicPartition> assignedPartitions = Collections.singletonList(t1p1);
+        class MockStreamThreadConsumer<K, V> extends MockConsumer<K, V> {
+
+            private StreamThread streamThread;
+
+            private MockStreamThreadConsumer(final OffsetResetStrategy offsetResetStrategy)
{
+                super(offsetResetStrategy);
+            }
+
+            @Override
+            public synchronized ConsumerRecords<K, V> poll(final Duration timeout)
{
+                assertNotNull(streamThread);
+                if (shutdownOnPoll) {
+                    streamThread.shutdown();
+                }
+                streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+                return super.poll(timeout);
+            }
+
+            private void setStreamThread(final StreamThread streamThread) {
+                this.streamThread = streamThread;
+            }
+        }
+
+        final MockStreamThreadConsumer<byte[], byte[]> mockStreamThreadConsumer =
+            new MockStreamThreadConsumer<>(OffsetResetStrategy.EARLIEST);
+
+        final TaskManager taskManager = new TaskManager(new MockChangelogReader(),
+                                                        processId,
+                                                        "log-prefix",
+                                                        mockStreamThreadConsumer,
+                                                        streamsMetadataState,
+                                                        null,
+                                                        null,
+                                                        null,
+                                                        new AssignedStreamsTasks(new LogContext()),
+                                                        new AssignedStandbyTasks(new LogContext()));
+        taskManager.setConsumer(mockStreamThreadConsumer);
+        taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
clientId);
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            mockStreamThreadConsumer,
+            mockStreamThreadConsumer,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            clientId,
+            new LogContext(""),
+            new AtomicInteger()
+        );
+
+        mockStreamThreadConsumer.setStreamThread(thread);
+        mockStreamThreadConsumer.assign(assignedPartitions);
+        mockStreamThreadConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+        mockStreamThreadConsumer.addRecord(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            1L,
+            0L,
+            TimestampType.CREATE_TIME,
+            ConsumerRecord.NULL_CHECKSUM,
+            -1,
+            -1,
+            new byte[0],
+            new byte[0]));
+        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+        thread.runOnce();
+    }
+
+    @Test
     public void shouldOnlyShutdownOnce() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);


Mime
View raw message