kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8104: Consumer cannot rejoin to the group after rebalancing (#7460)
Date Thu, 17 Oct 2019 05:54:39 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 21a53b0  KAFKA-8104: Consumer cannot rejoin to the group after rebalancing (#7460)
21a53b0 is described below

commit 21a53b023a1c91e5970bf82374759db05da1fc74
Author: Nikolay <nizhikov@apache.org>
AuthorDate: Thu Oct 17 08:53:14 2019 +0300

    KAFKA-8104: Consumer cannot rejoin to the group after rebalancing (#7460)
    
    This PR contains the fix of race condition bug between "consumer thread" and "consumer
coordinator heartbeat thread". It reproduces in many production environments.
    
    Condition for reproducing:
    
    1. Consumer thread initiates rejoin to the group because of commit timeout. Call of AbstractCoordinator#joinGroupIfNeeded
which leads to sendJoinGroupRequest.
    2. JoinGroupResponseHandler writes to the AbstractCoordinator.this.generation new generation
data and leaves the synchronized section.
    3. Heartbeat thread executes mabeLeaveGroup and clears generation data via resetGenerationOnLeaveGroup.
    4. Consumer thread executes onJoinComplete(generation.generationId, generation.memberId,
generation.protocol, memberAssignment); with the cleared generation data. This leads to the
corresponding exception.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    | 76 +++++++++++++++-------
 .../internals/ConsumerCoordinatorTest.java         | 47 +++++++++++++
 2 files changed, 100 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a44e7db..057cfb0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -404,14 +404,32 @@ public abstract class AbstractCoordinator implements Closeable {
             }
 
             if (future.succeeded()) {
-                // Duplicate the buffer in case `onJoinComplete` does not complete and needs
to be retried.
-                ByteBuffer memberAssignment = future.value().duplicate();
-                onJoinComplete(generation.generationId, generation.memberId, generation.protocol,
memberAssignment);
+                Generation generationSnapshot;
+
+                // Generation data maybe concurrently cleared by Heartbeat thread.
+                // Can't use synchronized for {@code onJoinComplete}, because it can be long
enough
+                // and  shouldn't block hearbeat thread.
+                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
+                synchronized (this) {
+                    generationSnapshot = this.generation;
+                }
 
-                // We reset the join group future only after the completion callback returns.
This ensures
-                // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
-                resetJoinGroupFuture();
-                needsJoinPrepare = true;
+                if (generationSnapshot != Generation.NO_GENERATION) {
+                    // Duplicate the buffer in case `onJoinComplete` does not complete and
needs to be retried.
+                    ByteBuffer memberAssignment = future.value().duplicate();
+
+                    onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId,
generationSnapshot.protocol, memberAssignment);
+
+                    // We reset the join group future only after the completion callback
returns. This ensures
+                    // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
+                    resetJoinGroupFuture();
+                    needsJoinPrepare = true;
+                } else {
+                    log.info("Generation data was cleared by heartbeat thread. Initiating
rejoin.");
+                    resetStateAndRejoin();
+
+                    return false;
+                }
             } else {
                 resetJoinGroupFuture();
                 final RuntimeException exception = future.exception();
@@ -433,6 +451,11 @@ public abstract class AbstractCoordinator implements Closeable {
         this.joinFuture = null;
     }
 
+    private void resetStateAndRejoin() {
+        rejoinNeeded = true;
+        state = MemberState.UNJOINED;
+    }
+
     private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
         // we store the join future in case we are woken up by the user after beginning the
         // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
@@ -455,16 +478,21 @@ public abstract class AbstractCoordinator implements Closeable {
                     // handle join completion in the callback so that the callback will be
invoked
                     // even if the consumer is woken up before finishing the rebalance
                     synchronized (AbstractCoordinator.this) {
-                        log.info("Successfully joined group with generation {}", generation.generationId);
-                        state = MemberState.STABLE;
-                        rejoinNeeded = false;
-                        // record rebalance latency
-                        lastRebalanceEndMs = time.milliseconds();
-                        sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
-                        lastRebalanceStartMs = -1L;
-
-                        if (heartbeatThread != null)
-                            heartbeatThread.enable();
+                        if (generation != Generation.NO_GENERATION) {
+                            log.info("Successfully joined group with generation {}", generation.generationId);
+                            state = MemberState.STABLE;
+                            rejoinNeeded = false;
+                            // record rebalance latency
+                            lastRebalanceEndMs = time.milliseconds();
+                            sensors.successfulRebalanceSensor.record(lastRebalanceEndMs -
lastRebalanceStartMs);
+                            lastRebalanceStartMs = -1L;
+
+                            if (heartbeatThread != null)
+                                heartbeatThread.enable();
+                        } else {
+                            log.info("Generation data was cleared by heartbeat thread. Rejoin
failed.");
+                            recordRebalanceFailure();
+                        }
                     }
                 }
 
@@ -473,10 +501,14 @@ public abstract class AbstractCoordinator implements Closeable {
                     // we handle failures below after the request finishes. if the join completes
                     // after having been woken up, the exception is ignored and we will rejoin
                     synchronized (AbstractCoordinator.this) {
-                        state = MemberState.UNJOINED;
-                        sensors.failedRebalanceSensor.record();
+                        recordRebalanceFailure();
                     }
                 }
+
+                private void recordRebalanceFailure() {
+                    state = MemberState.UNJOINED;
+                    sensors.failedRebalanceSensor.record();
+                }
             });
         }
         return joinFuture;
@@ -584,8 +616,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 synchronized (AbstractCoordinator.this) {
                     AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
                             joinResponse.data().memberId(), null);
-                    AbstractCoordinator.this.rejoinNeeded = true;
-                    AbstractCoordinator.this.state = MemberState.UNJOINED;
+                    AbstractCoordinator.this.resetStateAndRejoin();
                 }
                 future.raise(error);
             } else {
@@ -822,8 +853,7 @@ public abstract class AbstractCoordinator implements Closeable {
 
     private synchronized void resetGeneration() {
         this.generation = Generation.NO_GENERATION;
-        this.state = MemberState.UNJOINED;
-        this.rejoinNeeded = true;
+        resetStateAndRejoin();
     }
 
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 068f7bd..930eb54 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2279,6 +2279,53 @@ public class ConsumerCoordinatorTest {
                 coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)),
time.timer(Long.MAX_VALUE)));
     }
 
+    @Test
+    public void testConsumerRejoinAfterRebalance() throws Exception {
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false,
Optional.of("group-id"))) {
+            coordinator.ensureActiveGroup();
+
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
+
+            assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(
+                singletonMap(t1p, new OffsetAndMetadata(100L)),
+                time.timer(Long.MAX_VALUE)));
+
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+
+            int generationId = 42;
+            String memberId = "consumer-42";
+
+            client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader",
Errors.NONE));
+
+            MockTime time = new MockTime(1);
+
+            //onJoinPrepare will be executed and onJoinComplete will not.
+            boolean res = coordinator.joinGroupIfNeeded(time.timer(2));
+
+            assertFalse(res);
+            assertFalse(client.hasPendingResponses());
+            //SynGroupRequest not responded.
+            assertEquals(1, client.inFlightRequestCount());
+            assertEquals(generationId, coordinator.generation().generationId);
+            assertEquals(memberId, coordinator.generation().memberId);
+
+            // Imitating heartbeat thread that clears generation data.
+            coordinator.maybeLeaveGroup("Clear generation data.");
+
+            assertEquals(AbstractCoordinator.Generation.NO_GENERATION, coordinator.generation());
+
+            client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+            //Join future should succeed but generation already cleared so result of join
is false.
+            res = coordinator.joinGroupIfNeeded(time.timer(1));
+
+            assertFalse(res);
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
+        }
+    }
+
     private void receiveFencedInstanceIdException() {
         subscriptions.assignFromUser(singleton(t1p));
 


Mime
View raw message