kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (#7072)
Date Thu, 11 Jul 2019 06:19:37 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebb80f5  KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0
(#7072)
ebb80f5 is described below

commit ebb80f568da59cf60c76170176584bcb34d88ecb
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Jul 10 23:19:04 2019 -0700

    KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (#7072)
    
    The rebalance timeout was added to the JoinGroup protocol in version 1. Prior to 2.3,
    we handled version 0 JoinGroup requests by setting the rebalance timeout to be equal
    to the session timeout. We lost this logic when we converted the API to use the
    generated schema definition (#6419) which uses the default value of -1. The impact
    of this is that the group rebalance timeout becomes 0, so rebalances finish immediately
    after we enter the PrepareRebalance state and kick out all old members. This causes
    consumer groups to enter an endless rebalance loop. This patch restores the old
    behavior.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/common/requests/JoinGroupRequest.java    | 15 ++++++++++----
 .../common/requests/JoinGroupRequestTest.java      | 24 ++++++++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java | 12 +----------
 3 files changed, 36 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index a6ad17e..95d125d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -54,7 +54,6 @@ public class JoinGroupRequest extends AbstractRequest {
     }
 
     private final JoinGroupRequestData data;
-    private final short version;
 
     public static final String UNKNOWN_MEMBER_ID = "";
 
@@ -95,13 +94,21 @@ public class JoinGroupRequest extends AbstractRequest {
     public JoinGroupRequest(JoinGroupRequestData data, short version) {
         super(ApiKeys.JOIN_GROUP, version);
         this.data = data;
-        this.version = version;
+        maybeOverrideRebalanceTimeout(version);
     }
 
     public JoinGroupRequest(Struct struct, short version) {
         super(ApiKeys.JOIN_GROUP, version);
         this.data = new JoinGroupRequestData(struct, version);
-        this.version = version;
+        maybeOverrideRebalanceTimeout(version);
+    }
+
+    private void maybeOverrideRebalanceTimeout(short version) {
+        if (version == 0) {
+            // Version 0 has no rebalance timeout, so we use the session timeout
+            // to be consistent with the original behavior of the API.
+            data.setRebalanceTimeoutMs(data.sessionTimeoutMs());
+        }
     }
 
     public JoinGroupRequestData data() {
@@ -149,6 +156,6 @@ public class JoinGroupRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        return data.toStruct(version);
+        return data.toStruct(version());
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
index 644af9a..9d8031c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -19,11 +19,14 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
@@ -75,4 +78,25 @@ public class JoinGroupRequestTest {
                         .setProtocolType("consumer")
         ).build((short) 4);
     }
+
+    @Test
+    public void testRebalanceTimeoutDefaultsToSessionTimeoutV0() {
+        int sessionTimeoutMs = 30000;
+
+        Struct struct = new JoinGroupRequestData()
+                .setGroupId("groupId")
+                .setMemberId("consumerId")
+                .setGroupInstanceId("groupInstanceId")
+                .setProtocolType("consumer")
+                .setSessionTimeoutMs(sessionTimeoutMs)
+                .toStruct((short) 0);
+
+        ByteBuffer buffer = ByteBuffer.allocate(struct.sizeOf());
+        struct.writeTo(buffer);
+        buffer.flip();
+
+        JoinGroupRequest request = JoinGroupRequest.parse(buffer, (short) 0);
+        assertEquals(sessionTimeoutMs, request.data().sessionTimeoutMs());
+        assertEquals(sessionTimeoutMs, request.data().rebalanceTimeoutMs());
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 11c19b9..f8d45ab 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -819,17 +819,7 @@ public class RequestResponseTest {
                         .setName("consumer-range")
                         .setMetadata(new byte[0])).iterator()
         );
-        if (version == 0) {
-            return new JoinGroupRequest.Builder(
-                    new JoinGroupRequestData()
-                            .setGroupId("group1")
-                            .setSessionTimeoutMs(30000)
-                            .setMemberId("consumer1")
-                            .setGroupInstanceId(null)
-                            .setProtocolType("consumer")
-                            .setProtocols(protocols))
-                    .build((short) version);
-        } else if (version <= 4) {
+        if (version <= 4) {
             return new JoinGroupRequest.Builder(
                     new JoinGroupRequestData()
                             .setGroupId("group1")


Mime
View raw message