kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.8 updated: KAFKA-13214; Consumer should not reset state after retriable error in rebalance (#11231)
Date Tue, 24 Aug 2021 19:18:55 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 6c62600  KAFKA-13214; Consumer should not reset state after retriable error in rebalance
(#11231)
6c62600 is described below

commit 6c62600ba0b6a484232ff1e8ac28fef8e7c199e4
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Aug 24 11:59:38 2021 -0700

    KAFKA-13214; Consumer should not reset state after retriable error in rebalance (#11231)
    
    Currently the consumer will reset state after any retriable error during a rebalance.
This includes coordinator disconnects as well as coordinator changes. The impact of this is
that rebalances get delayed since they will be blocked until the session timeout of the old
memberId expires.
    
    The patch here fixes the problem by not resetting the member state after a retriable error.
    
    Reviewers: David Jacot <djacot@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    |   4 +-
 .../internals/AbstractCoordinatorTest.java         | 134 +++++++++++++++++++++
 .../internals/ConsumerCoordinatorTest.java         |  20 ++-
 3 files changed, 151 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index aff5571..3fa4ba2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -471,6 +471,9 @@ public abstract class AbstractCoordinator implements Closeable {
                 }
 
                 resetJoinGroupFuture();
+                rejoinNeeded = true;
+
+                resetJoinGroupFuture();
                 if (exception instanceof UnknownMemberIdException ||
                     exception instanceof RebalanceInProgressException ||
                     exception instanceof IllegalGenerationException ||
@@ -479,7 +482,6 @@ public abstract class AbstractCoordinator implements Closeable {
                 else if (!future.isRetriable())
                     throw exception;
 
-                resetStateAndRejoin();
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index c88cf74..6e3caae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
+import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
@@ -431,6 +432,139 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testRetainMemberIdAfterJoinGroupDisconnect() {
+        setupCoordinator();
+
+        String memberId = "memberId";
+        int generation = 5;
+
+        // Rebalance once to initialize the generation and memberId
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        expectJoinGroup("", generation, memberId);
+        expectSyncGroup(generation, memberId);
+        ensureActiveGroup(generation, memberId);
+
+        // Force a rebalance
+        coordinator.requestRejoin();
+        assertTrue(coordinator.rejoinNeededOrPending());
+
+        // Disconnect during the JoinGroup and ensure that the retry preserves the memberId
+        int rejoinedGeneration = 10;
+        expectDisconnectInJoinGroup(memberId);
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        expectJoinGroup(memberId, rejoinedGeneration, memberId);
+        expectSyncGroup(rejoinedGeneration, memberId);
+        ensureActiveGroup(rejoinedGeneration, memberId);
+    }
+
+    @Test
+    public void testRetainMemberIdAfterSyncGroupDisconnect() {
+        setupCoordinator();
+
+        String memberId = "memberId";
+        int generation = 5;
+
+        // Rebalance once to initialize the generation and memberId
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        expectJoinGroup("", generation, memberId);
+        expectSyncGroup(generation, memberId);
+        ensureActiveGroup(generation, memberId);
+
+        // Force a rebalance
+        coordinator.requestRejoin();
+        assertTrue(coordinator.rejoinNeededOrPending());
+
+        // Disconnect during the SyncGroup and ensure that the retry preserves the memberId
+        int rejoinedGeneration = 10;
+        expectJoinGroup(memberId, rejoinedGeneration, memberId);
+        expectDisconnectInSyncGroup(rejoinedGeneration, memberId);
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+
+        // Note that the consumer always starts from JoinGroup after a failed rebalance
+        expectJoinGroup(memberId, rejoinedGeneration, memberId);
+        expectSyncGroup(rejoinedGeneration, memberId);
+        ensureActiveGroup(rejoinedGeneration, memberId);
+    }
+
+    private void ensureActiveGroup(
+        int generation,
+        String memberId
+    ) {
+        coordinator.ensureActiveGroup();
+        assertEquals(generation, coordinator.generation().generationId);
+        assertEquals(memberId, coordinator.generation().memberId);
+        assertFalse(coordinator.rejoinNeededOrPending());
+    }
+
+    private void expectSyncGroup(
+        int expectedGeneration,
+        String expectedMemberId
+    ) {
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+            SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data();
+            return syncGroupRequest.generationId() == expectedGeneration
+                && syncGroupRequest.memberId().equals(expectedMemberId)
+                && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
+                && syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
+        }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME));
+    }
+
+    private void expectDisconnectInSyncGroup(
+        int expectedGeneration,
+        String expectedMemberId
+    ) {
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+            SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data();
+            return syncGroupRequest.generationId() == expectedGeneration
+                && syncGroupRequest.memberId().equals(expectedMemberId)
+                && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
+                && syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
+        }, null, true);
+    }
+
+    private void expectDisconnectInJoinGroup(
+        String expectedMemberId
+    ) {
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
+            }
+            JoinGroupRequestData joinGroupRequest = ((JoinGroupRequest) body).data();
+            return joinGroupRequest.memberId().equals(expectedMemberId)
+                && joinGroupRequest.protocolType().equals(PROTOCOL_TYPE);
+        }, null, true);
+    }
+
+    private void expectJoinGroup(
+        String expectedMemberId,
+        int responseGeneration,
+        String responseMemberId
+    ) {
+        JoinGroupResponse response = joinGroupFollowerResponse(
+            responseGeneration,
+            responseMemberId,
+            "leaderId",
+            Errors.NONE,
+            PROTOCOL_TYPE
+        );
+
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
+            }
+            JoinGroupRequestData joinGroupRequest = ((JoinGroupRequest) body).data();
+            return joinGroupRequest.memberId().equals(expectedMemberId)
+                && joinGroupRequest.protocolType().equals(PROTOCOL_TYPE);
+        }, response);
+    }
+
+    @Test
     public void testNoGenerationWillNotTriggerProtocolNameCheck() {
         final String wrongProtocolName = "wrong-name";
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a73f664..5cd56bb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1037,7 +1037,14 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.rejoinNeededOrPending());
 
-        client.respond(joinGroupLeaderResponse(2, consumerId, initialSubscription, Errors.NONE));
+        client.respond(request -> {
+            if (!(request instanceof JoinGroupRequest)) {
+                return false;
+            } else {
+                JoinGroupRequest joinRequest = (JoinGroupRequest) request;
+                return consumerId.equals(joinRequest.data().memberId());
+            }
+        }, joinGroupLeaderResponse(2, consumerId, initialSubscription, Errors.NONE));
         client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -1045,12 +1052,13 @@ public abstract class ConsumerCoordinatorTest {
         Collection<TopicPartition> revoked = getRevoked(partitions, partitions);
         assertEquals(revoked.isEmpty() ? 0 : 1, rebalanceListener.revokedCount);
         assertEquals(revoked.isEmpty() ? null : revoked, rebalanceListener.revoked);
-        Collection<TopicPartition> lost = getLost(partitions);
-        assertEquals(lost.isEmpty() ? 0 : 1, rebalanceListener.lostCount);
-        assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
+        // No partitions have been lost since the rebalance failure was not fatal
+        assertEquals(0, rebalanceListener.lostCount);
+        assertNull(rebalanceListener.lost);
+
+        Collection<TopicPartition> added = getAdded(partitions, partitions);
         assertEquals(2, rebalanceListener.assignedCount);
-        // Since onPartitionsLost is invoked when the JoinGroup failed, all owned partitions
have to be re-added
-        assertEquals(toSet(partitions), rebalanceListener.assigned);
+        assertEquals(added.isEmpty() ? Collections.emptySet() : toSet(partitions), rebalanceListener.assigned);
         assertEquals(toSet(partitions), subscriptions.assignedPartitions());
     }
 

Mime
View raw message