kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7194; Fix buffer underflow if onJoinComplete is retried after failure (#5417)
Date Tue, 24 Jul 2018 08:25:29 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c83ecf4  KAFKA-7194; Fix buffer underflow if onJoinComplete is retried after failure
(#5417)
c83ecf4 is described below

commit c83ecf4c55a2dbbb89c71080c601417adb8b699c
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Jul 24 01:25:18 2018 -0700

    KAFKA-7194; Fix buffer underflow if onJoinComplete is retried after failure (#5417)
    
    An untimely wakeup can cause ConsumerCoordinator.onJoinComplete to throw a WakeupException
before completion. On the next poll(), it will be retried, but this leads to an underflow
error because the buffer containing the assignment data will already have been advanced. The
solution is to duplicate the buffer passed to onJoinComplete.
    
    Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../consumer/internals/AbstractCoordinator.java    |  9 ++--
 .../consumer/internals/ConsumerCoordinator.java    |  6 +--
 .../internals/ConsumerCoordinatorTest.java         | 55 +++++++++++++++++++++-
 3 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b5c7a66..53834fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -200,9 +200,8 @@ public abstract class AbstractCoordinator implements Closeable {
                                                                  Map<String, ByteBuffer>
allMemberMetadata);
 
     /**
-     * Invoked when a group member has successfully joined a group. If this call is woken
up (i.e.
-     * if the invocation raises {@link org.apache.kafka.common.errors.WakeupException}),
then it
-     * will be retried on the next call to {@link #ensureActiveGroup()}.
+     * Invoked when a group member has successfully joined a group. If this call fails with
an exception,
+     * then it will be retried using the same assignment state on the next call to {@link
#ensureActiveGroup()}.
      *
      * @param generation The generation that was joined
      * @param memberId The identifier for the local member in the group
@@ -418,7 +417,9 @@ public abstract class AbstractCoordinator implements Closeable {
             }
 
             if (future.succeeded()) {
-                onJoinComplete(generation.generationId, generation.memberId, generation.protocol,
future.value());
+                // Duplicate the buffer in case `onJoinComplete` does not complete and needs
to be retried.
+                ByteBuffer memberAssignment = future.value().duplicate();
+                onJoinComplete(generation.generationId, generation.memberId, generation.protocol,
memberAssignment);
 
                 // We reset the join group future only after the completion callback returns.
This ensures
                 // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ea6d472..f9b77e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -269,10 +269,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             this.joinedSubscription = newJoinedSubscription;
         }
 
-        // update the metadata and enforce a refresh to make sure the fetcher can start
-        // fetching data in the next iteration
+        // Update the metadata to include the full group subscription. The leader will trigger
a rebalance
+        // if there are any metadata changes affecting any of the consumed partitions (whether
or not this
+        // instance is subscribed to the topics).
         this.metadata.setTopics(subscriptions.groupSubscription());
-        if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException();
 
         // give the assignor a chance to update internal state based on the received assignment
         assignor.onAssignment(assignment);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7c2638c..ba392c6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -875,6 +875,57 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testWakeupFromAssignmentCallback() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
+
+        final String topic = "topic1";
+        TopicPartition partition = new TopicPartition(topic, 0);
+        final String consumerId = "follower";
+        Set<String> topics = Collections.singleton(topic);
+        MockRebalanceListener rebalanceListener = new MockRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
+                boolean raiseWakeup = this.assignedCount == 0;
+                super.onPartitionsAssigned(partitions);
+
+                if (raiseWakeup)
+                    throw new WakeupException();
+            }
+        };
+
+        subscriptions.subscribe(topics, rebalanceListener);
+        metadata.setTopics(topics);
+
+        // we only have metadata for one topic initially
+        metadata.update(TestUtils.singletonCluster(topic, 1), Collections.emptySet(), time.milliseconds());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // prepare initial rebalance
+        partitionAssignor.prepare(singletonMap(consumerId, Collections.singletonList(partition)));
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Collections.singletonList(partition), Errors.NONE));
+
+
+        // The first call to poll should raise the exception from the rebalance listener
+        try {
+            coordinator.poll(Long.MAX_VALUE);
+            fail("Expected exception thrown from assignment callback");
+        } catch (WakeupException e) {
+        }
+
+        // The second call should retry the assignment callback and succeed
+        coordinator.poll(Long.MAX_VALUE);
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(2, rebalanceListener.assignedCount);
+    }
+
+    @Test
     public void testRebalanceAfterTopicUnavailableWithSubscribe() {
         unavailableTopicTest(false, false, Collections.<String>emptySet());
     }
@@ -1901,7 +1952,7 @@ public class ConsumerCoordinatorTest {
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId,
String leaderId, Errors error) {
         return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId,
leaderId,
-                Collections.<String, ByteBuffer>emptyMap());
+                Collections.emptyMap());
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors
error) {
@@ -1914,7 +1965,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
-        return new OffsetFetchResponse(topLevelError, Collections.<TopicPartition, OffsetFetchResponse.PartitionData>emptyMap());
+        return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
     }
 
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError,
String metadata, long offset) {


Mime
View raw message