kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2753: improve SyncGroup error handling in client
Date Thu, 05 Nov 2015 18:16:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a4551773c -> 7eee11451


KAFKA-2753: improve SyncGroup error handling in client

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #433 from hachikuji/KAFKA-2753


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7eee1145
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7eee1145
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7eee1145

Branch: refs/heads/trunk
Commit: 7eee11451e1f0d17efa27775becfb370a9894d56
Parents: a455177
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Nov 5 10:22:21 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Nov 5 10:22:21 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 28 +++++++---
 .../internals/ConsumerCoordinatorTest.java      | 55 +++++++++++++++++++-
 2 files changed, 76 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7eee1145/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index e9af6c8..44371cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -417,15 +417,31 @@ public abstract class AbstractCoordinator implements Closeable {
         @Override
         public void handle(SyncGroupResponse syncResponse,
                            RequestFuture<ByteBuffer> future) {
-            short errorCode = syncResponse.errorCode();
-            if (errorCode == Errors.NONE.code()) {
-                future.complete(syncResponse.memberAssignment());
+            Errors error = Errors.forCode(syncResponse.errorCode());
+            if (error == Errors.NONE) {
+                log.debug("Received successful sync group response for group {}: {}", groupId,
syncResponse.toStruct());
                 sensors.syncLatency.record(response.requestLatencyMs());
-            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
-                future.raise(new GroupAuthorizationException(groupId));
+                future.complete(syncResponse.memberAssignment());
             } else {
                 AbstractCoordinator.this.rejoinNeeded = true;
-                future.raise(Errors.forCode(errorCode));
+                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                    future.raise(new GroupAuthorizationException(groupId));
+                } else if (error == Errors.REBALANCE_IN_PROGRESS) {
+                    log.info("SyncGroup for group {} failed due to coordinator rebalance,
rejoining the group", groupId);
+                    future.raise(error);
+                } else if (error == Errors.UNKNOWN_MEMBER_ID
+                        || error == Errors.ILLEGAL_GENERATION) {
+                    log.info("SyncGroup for group {} failed due to {}, rejoining the group",
groupId, error);
+                    AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+                    future.raise(error);
+                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                        || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+                    log.info("SyncGroup for group {} failed due to {}, will find new coordinator
and rejoin", groupId, error);
+                    coordinatorDead();
+                    future.raise(error);
+                } else {
+                    future.raise(new KafkaException("Unexpected error from SyncGroup: " +
error.exception().getMessage()));
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7eee1145/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 391f719..8e47fc3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
@@ -422,6 +423,52 @@ public class ConsumerCoordinatorTest {
         assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation);
     }
 
+    @Test(expected = KafkaException.class)
+    public void testUnexpectedErrorOnSyncGroup() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join initially, but let coordinator rebalance on sync
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.UNKNOWN.code()));
+        coordinator.ensurePartitionAssignment();
+    }
+
+    @Test
+    public void testUnknownMemberIdOnSyncGroup() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join initially, but let coordinator returns unknown member id
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.UNKNOWN_MEMBER_ID.code()));
+
+        // now we should see a new join with the empty UNKNOWN_MEMBER_ID
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
+                return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+            }
+        }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+    }
+
     @Test
     public void testRebalanceInProgressOnSyncGroup() {
         final String consumerId = "consumer";
@@ -461,7 +508,13 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.ILLEGAL_GENERATION.code()));
 
         // then let the full join/sync finish successfully
-        client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
+                return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+            }
+        }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
 
         coordinator.ensurePartitionAssignment();


Mime
View raw message