kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2697: client-side support for leave group
Date Wed, 04 Nov 2015 22:58:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8de62253a -> ef5d168cc


KAFKA-2697: client-side support for leave group

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava, Guozhang Wang

Closes #414 from hachikuji/KAFKA-2697


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef5d168c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef5d168c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef5d168c

Branch: refs/heads/trunk
Commit: ef5d168cc8f10ad4f0efe9df4cbe849a4b35496e
Parents: 8de6225
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Nov 4 15:04:03 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 4 15:04:03 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java | 12 ++--
 .../kafka/clients/consumer/KafkaConsumer.java   | 18 +++--
 .../kafka/clients/consumer/MockConsumer.java    |  2 +-
 .../consumer/internals/AbstractCoordinator.java | 76 +++++++++++++++++---
 .../consumer/internals/ConsumerCoordinator.java | 28 ++++----
 .../internals/ConsumerNetworkClient.java        | 15 ++--
 .../apache/kafka/common/protocol/Protocol.java  | 10 +--
 .../common/requests/LeaveGroupRequest.java      | 16 ++---
 .../internals/ConsumerCoordinatorTest.java      | 66 ++++++++++++++++-
 .../runtime/distributed/WorkerCoordinator.java  |  9 +--
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 11 files changed, 190 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index a3d8776..c9f114d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -12,18 +12,18 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
 /**
  * @see KafkaConsumer
  * @see MockConsumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a6be519..f3d2e15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -727,7 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         try {
             log.debug("Unsubscribed all topics or patterns and assigned partitions");
             this.subscriptions.unsubscribe();
-            this.coordinator.resetGeneration();
+            this.coordinator.maybeLeaveGroup(false);
             this.metadata.needMetadataForAllTopics(false);
         } finally {
             release();
@@ -790,11 +790,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                 throw new IllegalArgumentException("Timeout must not be negative");
 
             // poll for new data until the timeout expires
+            long start = time.milliseconds();
             long remaining = timeout;
-            while (remaining >= 0) {
-                long start = time.milliseconds();
+            do {
                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records
= pollOnce(remaining);
-
                 if (!records.isEmpty()) {
                     // if data is available, then return it, but first send off the
                     // next round of fetches to enable pipelining while the user is
@@ -804,8 +803,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     return new ConsumerRecords<>(records);
                 }
 
-                remaining -= time.milliseconds() - start;
-            }
+                long elapsed = time.milliseconds() - start;
+                remaining = timeout - elapsed;
+            } while (remaining > 0);
 
             return ConsumerRecords.empty();
         } finally {
@@ -1157,6 +1157,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         }
     }
 
+    /**
+     * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is
+     * enabled, this will commit the current offsets.
+     */
     @Override
     public void close() {
         acquire();
@@ -1179,7 +1183,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
     private void close(boolean swallowException) {
         log.trace("Closing the Kafka consumer.");
-        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
         this.closed = true;
         ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 25c0c2c..894bc93 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -31,8 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.regex.Pattern;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
This class is <i> not

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 781ff78..e9af6c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -45,6 +47,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -77,7 +80,7 @@ import java.util.concurrent.TimeUnit;
  * {@link #onJoinComplete(int, String, String, ByteBuffer)}.
  *
  */
-public abstract class AbstractCoordinator {
+public abstract class AbstractCoordinator implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
 
@@ -196,15 +199,6 @@ public abstract class AbstractCoordinator {
     }
 
     /**
-     * Reset the generation/memberId tracked by this member
-     */
-    public void resetGeneration() {
-        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
-        this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
-        rejoinNeeded = true;
-    }
-
-    /**
      * Ensure that the group is active (i.e. joined and synced)
      */
     public void ensureActiveGroup() {
@@ -514,7 +508,6 @@ public abstract class AbstractCoordinator {
         return false;
     }
 
-
     /**
      * Mark the current coordinator as dead.
      */
@@ -526,6 +519,67 @@ public abstract class AbstractCoordinator {
     }
 
     /**
+     * Close the coordinator, waiting if needed to send LeaveGroup.
+     */
+    @Override
+    public void close() {
+        maybeLeaveGroup(true);
+    }
+
+    /**
+     * Leave the current group and reset local generation/memberId.
+     */
+    public void maybeLeaveGroup(boolean awaitResponse) {
+        if (!coordinatorUnknown() && generation > 0) {
+            // this is a minimal effort attempt to leave the group. we do not
+            // attempt any resending if the request fails or times out.
+            sendLeaveGroupRequest(awaitResponse);
+        }
+
+        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
+        this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
+        rejoinNeeded = true;
+    }
+
+    private void sendLeaveGroupRequest(boolean awaitResponse) {
+        LeaveGroupRequest request = new LeaveGroupRequest(groupId, memberId);
+        RequestFuture<Void> future = client.send(coordinator, ApiKeys.LEAVE_GROUP,
request)
+                .compose(new LeaveGroupResponseHandler());
+
+        future.addListener(new RequestFutureListener<Void>() {
+            @Override
+            public void onSuccess(Void value) {}
+
+            @Override
+            public void onFailure(RuntimeException e) {
+                log.info("LeaveGroup request failed with error", e);
+            }
+        });
+
+        if (awaitResponse)
+            client.poll(future);
+        else
+            client.poll(future, 0);
+    }
+
+    private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse,
Void> {
+        @Override
+        public LeaveGroupResponse parse(ClientResponse response) {
+            return new LeaveGroupResponse(response.responseBody());
+        }
+
+        @Override
+        public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future)
{
+            // process the response
+            short errorCode = leaveResponse.errorCode();
+            if (errorCode == Errors.NONE.code())
+                future.complete(null);
+            else
+                future.raise(Errors.forCode(errorCode));
+        }
+    }
+
+    /**
      * Send a heartbeat request now (visible only for testing).
      */
     public RequestFuture<Void> sendHeartbeatRequest() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index c7323cb..25d389c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -15,9 +15,6 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
@@ -26,6 +23,9 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -45,7 +45,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -58,7 +57,7 @@ import java.util.Set;
 /**
  * This class manages the coordination process with the consumer coordinator.
  */
-public final class ConsumerCoordinator extends AbstractCoordinator implements Closeable {
+public final class ConsumerCoordinator extends AbstractCoordinator {
 
     private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
 
@@ -305,15 +304,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements
Cl
 
     @Override
     public void close() {
-        // commit offsets prior to closing if auto-commit enabled
-        while (true) {
-            try {
-                maybeAutoCommitOffsetsSync();
-                return;
-            } catch (WakeupException e) {
-                // ignore wakeups while closing to ensure we have a chance to commit
-                continue;
+        try {
+            while (true) {
+                try {
+                    maybeAutoCommitOffsetsSync();
+                    return;
+                } catch (WakeupException e) {
+                    // ignore wakeups while closing to ensure we have a chance to commit
+                    continue;
+                }
             }
+        } finally {
+            super.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4757fc4..f1f1cc7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -17,8 +17,8 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
@@ -162,12 +162,15 @@ public class ConsumerNetworkClient implements Closeable {
      * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public boolean poll(RequestFuture<?> future, long timeout) {
-        long now = time.milliseconds();
-        long deadline = now + timeout;
-        while (!future.isDone() && now < deadline) {
-            poll(deadline - now, now);
+        long begin = time.milliseconds();
+        long remaining = timeout;
+        long now = begin;
+        do {
+            poll(remaining, now);
             now = time.milliseconds();
-        }
+            long elapsed = now - begin;
+            remaining = timeout - elapsed;
+        } while (!future.isDone() && remaining > 0);
         return future.isDone();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 00560db..ff844e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -528,10 +528,10 @@ public class Protocol {
     public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0};
 
     /* Heartbeat api */
-    public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING,
"The consumer group id."),
+    public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING,
"The group id."),
                                                                  new Field("group_generation_id",
                                                                            INT32,
-                                                                           "The generation
of the consumer group."),
+                                                                           "The generation
of the group."),
                                                                  new Field("member_id",
                                                                            STRING,
                                                                            "The member id
assigned by the group coordinator."));
@@ -542,10 +542,10 @@ public class Protocol {
     public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
     /* Leave group api */
-    public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
STRING, "The consumer group id."),
-                                                                   new Field("consumer_id",
+    public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
STRING, "The group id."),
+                                                                   new Field("member_id",
                                                                              STRING,
-                                                                             "The consumer
id assigned by the group coordinator."));
+                                                                             "The member
id assigned by the group coordinator."));
 
     public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code",
INT16));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index fcc056a..05bdf90 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -23,23 +23,23 @@ public class LeaveGroupRequest extends AbstractRequest {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEAVE_GROUP.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
 
     private final String groupId;
-    private final String consumerId;
+    private final String memberId;
 
-    public LeaveGroupRequest(String groupId, String consumerId) {
+    public LeaveGroupRequest(String groupId, String memberId) {
         super(new Struct(CURRENT_SCHEMA));
         struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(MEMBER_ID_KEY_NAME, memberId);
         this.groupId = groupId;
-        this.consumerId = consumerId;
+        this.memberId = memberId;
     }
 
     public LeaveGroupRequest(Struct struct) {
         super(struct);
         groupId = struct.getString(GROUP_ID_KEY_NAME);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
+        memberId = struct.getString(MEMBER_ID_KEY_NAME);
     }
 
     @Override
@@ -57,8 +57,8 @@ public class LeaveGroupRequest extends AbstractRequest {
         return groupId;
     }
 
-    public String consumerId() {
-        return consumerId;
+    public String memberId() {
+        return memberId;
     }
 
     public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7fd6d88..391f719 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -35,7 +35,10 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -362,6 +365,64 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testLeaveGroupOnClose() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        final AtomicBoolean received = new AtomicBoolean(false);
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                received.set(true);
+                LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+                return leaveRequest.memberId().equals(consumerId) &&
+                        leaveRequest.groupId().equals(groupId);
+            }
+        }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+        coordinator.close();
+        assertTrue(received.get());
+    }
+
+    @Test
+    public void testMaybeLeaveGroup() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        final AtomicBoolean received = new AtomicBoolean(false);
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                received.set(true);
+                LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+                return leaveRequest.memberId().equals(consumerId) &&
+                        leaveRequest.groupId().equals(groupId);
+            }
+        }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+        coordinator.maybeLeaveGroup(false);
+        assertTrue(received.get());
+        assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId);
+        assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, coordinator.generation);
+    }
+
+    @Test
     public void testRebalanceInProgressOnSyncGroup() {
         final String consumerId = "consumer";
 
@@ -543,7 +604,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testResetGeneration() {
+    public void testCommitAfterLeaveGroup() {
         // enable auto-assignment
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
 
@@ -555,8 +616,9 @@ public class ConsumerCoordinatorTest {
         coordinator.ensurePartitionAssignment();
 
         // now switch to manual assignment
+        client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
         subscriptions.unsubscribe();
-        coordinator.resetGeneration();
+        coordinator.maybeLeaveGroup(false);
         subscriptions.assignFromUser(Arrays.asList(tp));
 
         // the client should not reuse generation/memberId from auto-subscribed generation

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
index d11165c..c748971 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -242,14 +242,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements
Clos
         return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed())
|| rejoinRequested;
     }
 
-    @Override
-    public void close() {
-    }
-
     public String memberId() {
         return this.memberId;
     }
 
+    @Override
+    public void close() {
+        super.close();
+    }
+
     private class CopycatWorkerCoordinatorMetrics {
         public final Metrics metrics;
         public final String metricGrpName;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5d168c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 21434f7..df064e4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -895,7 +895,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // let the coordinator to handle leave-group
       coordinator.handleLeaveGroup(
         leaveGroupRequest.groupId(),
-        leaveGroupRequest.consumerId(),
+        leaveGroupRequest.memberId(),
         sendResponseCallback)
     }
   }


Mime
View raw message