kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9140: Also reset join future when generation was reset in order to re-join (#7647)
Date Wed, 06 Nov 2019 17:50:32 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 78679cf  KAFKA-9140: Also reset join future when generation was reset in order to
re-join (#7647)
78679cf is described below

commit 78679cf8ce283648b4b934f098edac3fb7916f1a
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Wed Nov 6 09:47:08 2019 -0800

    KAFKA-9140: Also reset join future when generation was reset in order to re-join (#7647)
    
    Otherwise the join-group would not be resend and we'd just fall into the endless loop.
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Boyang Chen <boyang@confluent.io>,
A. Sophie Blee-Goldman <sophie@confluent.io>
---
 .../clients/consumer/internals/AbstractCoordinator.java    | 10 ++++++----
 .../consumer/internals/ConsumerCoordinatorTest.java        | 14 ++++++++++++--
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 3a76276..62f720e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -410,7 +410,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 // Can't use synchronized for {@code onJoinComplete}, because it can be long
enough
                 // and  shouldn't block hearbeat thread.
                 // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
-                synchronized (this) {
+                synchronized (AbstractCoordinator.this) {
                     generationSnapshot = this.generation;
                 }
 
@@ -420,14 +420,16 @@ public abstract class AbstractCoordinator implements Closeable {
 
                     onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId,
generationSnapshot.protocol, memberAssignment);
 
-                    // We reset the join group future only after the completion callback
returns. This ensures
+                    // Generally speaking we should always resetJoinGroupFuture once the
future is done, but here
+                    // we can only reset the join group future after the completion callback
returns. This ensures
                     // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
+                    // And because of that we should explicitly trigger resetJoinGroupFuture
in other conditions below.
                     resetJoinGroupFuture();
                     needsJoinPrepare = true;
                 } else {
                     log.info("Generation data was cleared by heartbeat thread. Initiating
rejoin.");
                     resetStateAndRejoin();
-
+                    resetJoinGroupFuture();
                     return false;
                 }
             } else {
@@ -451,7 +453,7 @@ public abstract class AbstractCoordinator implements Closeable {
         this.joinFuture = null;
     }
 
-    private void resetStateAndRejoin() {
+    private synchronized void resetStateAndRejoin() {
         rejoinNeeded = true;
         state = MemberState.UNJOINED;
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 5ff9761..6617fa2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2333,7 +2333,7 @@ public class ConsumerCoordinatorTest {
 
             assertFalse(res);
             assertFalse(client.hasPendingResponses());
-            //SynGroupRequest not responded.
+            // SynGroupRequest not responded.
             assertEquals(1, client.inFlightRequestCount());
             assertEquals(generationId, coordinator.generation().generationId);
             assertEquals(memberId, coordinator.generation().memberId);
@@ -2345,12 +2345,22 @@ public class ConsumerCoordinatorTest {
 
             client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-            //Join future should succeed but generation already cleared so result of join
is false.
+            // Join future should succeed but generation already cleared so result of join
is false.
             res = coordinator.joinGroupIfNeeded(time.timer(1));
 
             assertFalse(res);
             assertFalse(client.hasPendingResponses());
             assertFalse(client.hasInFlightRequests());
+
+            // Retry join should then succeed
+            client.prepareResponse(joinGroupFollowerResponse(generationId, memberId, "leader",
Errors.NONE));
+            client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+            res = coordinator.joinGroupIfNeeded(time.timer(2));
+
+            assertTrue(res);
+            assertFalse(client.hasPendingResponses());
+            assertFalse(client.hasInFlightRequests());
         }
     }
 


Mime
View raw message