kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
Date Thu, 24 Jun 2021 15:42:11 GMT
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 574af88  KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
574af88 is described below

commit 574af88305273f21456a9b10f21c182181cfc600
Author: David Jacot <djacot@confluent.io>
AuthorDate: Thu Jun 24 17:40:40 2021 +0200

    KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
    
    This patch fixes the unsynchronized accesses to `AbstractCoordinator.state`.
    
    Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
---
 .../consumer/internals/AbstractCoordinator.java    | 14 +++++----
 .../consumer/internals/ConsumerCoordinator.java    | 34 +++++++++++++---------
 2 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index cd1daa8..9fbfe1f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1119,12 +1119,14 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                 // since we may be sending the request during rebalance, we should check
                 // this case and ignore the REBALANCE_IN_PROGRESS error
-                if (state == MemberState.STABLE) {
-                    requestRejoin("group is already rebalancing");
-                    future.raise(error);
-                } else {
-                    log.debug("Ignoring heartbeat response with error {} during {} state",
error, state);
-                    future.complete(null);
+                synchronized (AbstractCoordinator.this) {
+                    if (state == MemberState.STABLE) {
+                        requestRejoin("group is already rebalancing");
+                        future.raise(error);
+                    } else {
+                        log.debug("Ignoring heartbeat response with error {} during {} state",
error, state);
+                        future.complete(null);
+                    }
                 }
             } else if (error == Errors.ILLEGAL_GENERATION ||
                        error == Errors.UNKNOWN_MEMBER_ID ||
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 226297a..39f4520 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1216,13 +1216,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator
{
                             if (generationUnchanged()) {
                                 future.raise(error);
                             } else {
-                                if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE)
{
-                                    future.raise(new RebalanceInProgressException("Offset
commit cannot be completed since the " +
-                                        "consumer member's old generation is fenced by its
group instance id, it is possible that " +
-                                        "this consumer has already participated another rebalance
and got a new generation"));
-                                } else {
-                                    future.raise(new CommitFailedException());
+                                KafkaException exception;
+                                synchronized (ConsumerCoordinator.this) {
+                                    if (ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE)
{
+                                        exception = new RebalanceInProgressException("Offset
commit cannot be completed since the " +
+                                            "consumer member's old generation is fenced by
its group instance id, it is possible that " +
+                                            "this consumer has already participated another
rebalance and got a new generation");
+                                    } else {
+                                        exception = new CommitFailedException();
+                                    }
                                 }
+                                future.raise(exception);
                             }
                             return;
                         } else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -1245,14 +1249,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator
{
 
                             // only need to reset generation and re-join group if generation
has not changed or we are not in rebalancing;
                             // otherwise only raise rebalance-in-progress error
-                            if (!generationUnchanged() && ConsumerCoordinator.this.state
== MemberState.PREPARING_REBALANCE) {
-                                future.raise(new RebalanceInProgressException("Offset commit
cannot be completed since the " +
-                                    "consumer member's generation is already stale, meaning
it has already participated another rebalance and " +
-                                    "got a new generation. You can try completing the rebalance
by calling poll() and then retry commit again"));
-                            } else {
-                                resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
-                                future.raise(new CommitFailedException());
+                            KafkaException exception;
+                            synchronized (ConsumerCoordinator.this) {
+                                if (!generationUnchanged() && ConsumerCoordinator.this.state
== MemberState.PREPARING_REBALANCE) {
+                                    exception = new RebalanceInProgressException("Offset
commit cannot be completed since the " +
+                                        "consumer member's generation is already stale, meaning
it has already participated another rebalance and " +
+                                        "got a new generation. You can try completing the
rebalance by calling poll() and then retry commit again");
+                                } else {
+                                    resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT,
error);
+                                    exception = new CommitFailedException();
+                                }
                             }
+                            future.raise(exception);
                             return;
                         } else {
                             future.raise(new KafkaException("Unexpected error in commit:
" + error.message()));

Mime
View raw message