kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-5154: do not set rejoinNeeded in joinGroup response but in syncGroup response handler
Date Thu, 01 Jun 2017 20:31:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b3036c586 -> e4d829c47


KAFKA-5154: do not set rejoinNeeded in joinGroup response but in syncGroup response handler

Scenario is as follows:
1. Consumer subscribes to topic t1 and begins consuming
2. heartbeat fails as the group is rebalancing
3. ConsumerCoordinator.onJoinGroupPrepare is called
   3.1 onPartitionsRevoked is called
4. consumer becomes the group leader
5. sends sync group request
6. sync group is cancelled due to disconnection
7. fetch request is sent for partitions that have previously been revoked

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3181 from dguy/kafka-5154


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b16acaa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b16acaa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b16acaa

Branch: refs/heads/trunk
Commit: 1b16acaaa181ceb214d84e70b8ddc146af9c0c5c
Parents: b3036c5
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Jun 1 13:30:49 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 1 13:30:49 2017 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  5 +-
 .../clients/consumer/KafkaConsumerTest.java     | 74 +++++++++++++++++++-
 2 files changed, 76 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1b16acaa/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d0de4bb..e380bae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -457,7 +457,6 @@ public abstract class AbstractCoordinator implements Closeable {
                     } else {
                         AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
                                 joinResponse.memberId(), joinResponse.groupProtocol());
-                        AbstractCoordinator.this.rejoinNeeded = false;
                         if (joinResponse.isLeader()) {
                             onJoinLeader(joinResponse).chain(future);
                         } else {
@@ -537,6 +536,7 @@ public abstract class AbstractCoordinator implements Closeable {
             if (error == Errors.NONE) {
                 sensors.syncLatency.record(response.requestLatencyMs());
                 future.complete(syncResponse.memberAssignment());
+                AbstractCoordinator.this.rejoinNeeded = false;
             } else {
                 requestRejoin();
 
@@ -783,8 +783,9 @@ public abstract class AbstractCoordinator implements Closeable {
         @Override
         public void onFailure(RuntimeException e, RequestFuture<T> future) {
             // mark the coordinator as dead
-            if (e instanceof DisconnectException)
+            if (e instanceof DisconnectException) {
                 coordinatorDead();
+            }
             future.raise(e);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b16acaa/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f918a34..94979e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchResponse.PartitionData;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
@@ -98,6 +99,7 @@ import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -1256,6 +1258,77 @@ public class KafkaConsumerTest {
         }
     }
 
+    @Test
+    public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 500;
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata,
assignor,
+                                                                   rebalanceTimeoutMs, sessionTimeoutMs,
heartbeatIntervalMs, false, 1000);
+
+        consumer.subscribe(Collections.singleton(topic), getConsumerRebalanceListener(consumer));
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId",
Errors.NONE), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE),
coordinator);
+
+        client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
+        client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
+
+        consumer.poll(0);
+
+        // heartbeat fails due to rebalance in progress
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return true;
+            }
+        }, new HeartbeatResponse(Errors.REBALANCE_IN_PROGRESS), coordinator);
+
+        // join group
+        final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(Collections.singletonList(topic)));
+
+        // This member becomes the leader
+        final JoinGroupResponse leaderResponse = new JoinGroupResponse(Errors.NONE, 1, assignor.name(),
"memberId", "memberId",
+                                                                 Collections.<String,
ByteBuffer>singletonMap("memberId", byteBuffer));
+        client.prepareResponseFrom(leaderResponse, coordinator);
+
+        // sync group fails due to disconnect
+        client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE),
coordinator, true);
+
+        // should try and find the new coordinator
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+
+        // rejoin group
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId",
Errors.NONE), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE),
coordinator);
+
+        client.prepareResponseFrom(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(final AbstractRequest body) {
+                return body instanceof FetchRequest && ((FetchRequest) body).fetchData().containsKey(tp0);
+            }
+        }, fetchResponse(tp0, 1, 1), node);
+        time.sleep(heartbeatIntervalMs);
+        Thread.sleep(heartbeatIntervalMs);
+        final ConsumerRecords<String, String> records = consumer.poll(0);
+        assertFalse(records.isEmpty());
+    }
+
     private void consumerCloseTest(final long closeTimeoutMs,
             List<? extends AbstractResponse> responses,
             long waitMs,
@@ -1360,7 +1433,6 @@ public class KafkaConsumerTest {
         return new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
-
             }
 
             @Override


Mime
View raw message