kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5611; AbstractCoordinator should handle wakeup raised from onJoinComplete
Date Thu, 27 Jul 2017 06:00:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 c1f583e67 -> 25bcc2176


KAFKA-5611; AbstractCoordinator should handle wakeup raised from onJoinComplete

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3571 from hachikuji/KAFKA-5611

(cherry picked from commit 3620035c45736b74e69921f7c50a1c3857aec334)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/25bcc217
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/25bcc217
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/25bcc217

Branch: refs/heads/0.11.0
Commit: 25bcc217646ddd465a407c0d219111f97f910650
Parents: c1f583e
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Jul 26 22:53:11 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed Jul 26 22:58:59 2017 -0700

----------------------------------------------------------------------
 .../consumer/ConsumerRebalanceListener.java     | 16 ++++++++++
 .../consumer/internals/AbstractCoordinator.java | 13 ++++++--
 .../internals/AbstractCoordinatorTest.java      | 32 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/25bcc217/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index 3a3873a..845bff3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -87,8 +87,16 @@ public interface ConsumerRebalanceListener {
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer
KafkaConsumer}
      * <p>
      * <b>NOTE:</b> This method is only called before rebalances. It is not called
prior to {@link KafkaConsumer#close()}.
+     * <p>
+     * It is common for the revocation callback to use the consumer instance in order to
commit offsets. It is possible
+     * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
+     * to be raised from one these nested invocations. In this case, the exception will be
propagated to the current
+     * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed.
This means it is not
+     * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer
thread.
      *
      * @param partitions The list of partitions that were assigned to the consumer on the
last rebalance
+     * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call
to {@link KafkaConsumer}
+     * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested
call to {@link KafkaConsumer}
      */
     void onPartitionsRevoked(Collection<TopicPartition> partitions);
 
@@ -100,9 +108,17 @@ public interface ConsumerRebalanceListener {
      * It is guaranteed that all the processes in a consumer group will execute their
      * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
      * {@link #onPartitionsAssigned(Collection)} callback.
+     * <p>
+     * It is common for the assignment callback to use the consumer instance in order to
query offsets. It is possible
+     * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
+     * to be raised from one these nested invocations. In this case, the exception will be
propagated to the current
+     * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed.
This means it is not
+     * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer
thread.
      *
      * @param partitions The list of partitions that are now assigned to the consumer (may
include partitions previously
      *            assigned to the consumer)
+     * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call
to {@link KafkaConsumer}
+     * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested
call to {@link KafkaConsumer}
      */
     void onPartitionsAssigned(Collection<TopicPartition> partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/25bcc217/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 742b4ef..a7195b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -181,7 +181,10 @@ public abstract class AbstractCoordinator implements Closeable {
                                                                  Map<String, ByteBuffer>
allMemberMetadata);
 
     /**
-     * Invoked when a group member has successfully joined a group.
+     * Invoked when a group member has successfully joined a group. If this call is woken
up (i.e.
+     * if the invocation raises {@link org.apache.kafka.common.errors.WakeupException}),
then it
+     * will be retried on the next call to {@link #ensureActiveGroup()}.
+     *
      * @param generation The generation that was joined
      * @param memberId The identifier for the local member in the group
      * @param protocol The protocol selected by the coordinator
@@ -359,12 +362,16 @@ public abstract class AbstractCoordinator implements Closeable {
 
             RequestFuture<ByteBuffer> future = initiateJoinGroup();
             client.poll(future);
-            resetJoinGroupFuture();
 
             if (future.succeeded()) {
-                needsJoinPrepare = true;
                 onJoinComplete(generation.generationId, generation.memberId, generation.protocol,
future.value());
+
+                // We reset the join group future only after the completion callback returns.
This ensures
+                // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
+                resetJoinGroupFuture();
+                needsJoinPrepare = true;
             } else {
+                resetJoinGroupFuture();
                 RuntimeException exception = future.exception();
                 if (exception instanceof UnknownMemberIdException ||
                         exception instanceof RebalanceInProgressException ||

http://git-wip-us.apache.org/repos/asf/kafka/blob/25bcc217/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index afebd9d..e35efb7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -479,7 +479,36 @@ public class AbstractCoordinatorTest {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(1, coordinator.onJoinCompleteInvokes);
+
+        awaitFirstHeartbeat(heartbeatReceived);
+    }
+
+    @Test
+    public void testWakeupInOnJoinComplete() throws Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
+        coordinator.wakeupOnJoinComplete = true;
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
+
+        try {
+            coordinator.ensureActiveGroup();
+            fail("Should have woken up from ensureActiveGroup()");
+        } catch (WakeupException e) {
+        }
+
+        assertEquals(1, coordinator.onJoinPrepareInvokes);
+        assertEquals(0, coordinator.onJoinCompleteInvokes);
+        assertFalse(heartbeatReceived.get());
+
         // the join group completes in this poll()
+        coordinator.wakeupOnJoinComplete = false;
         consumerClient.poll(0);
         coordinator.ensureActiveGroup();
 
@@ -534,6 +563,7 @@ public class AbstractCoordinatorTest {
 
         private int onJoinPrepareInvokes = 0;
         private int onJoinCompleteInvokes = 0;
+        private boolean wakeupOnJoinComplete = false;
 
         public DummyCoordinator(ConsumerNetworkClient client,
                                 Metrics metrics,
@@ -567,6 +597,8 @@ public class AbstractCoordinatorTest {
 
         @Override
         protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer
memberAssignment) {
+            if (wakeupOnJoinComplete)
+                throw new WakeupException();
             onJoinCompleteInvokes++;
         }
     }


Mime
View raw message