kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)
Date Fri, 29 May 2020 16:13:56 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 8194c86  KAFKA-10056; Ensure consumer metadata contains new topics on subscription
change (#8739)
8194c86 is described below

commit 8194c866fc09da84e597e5bf8eb798cf5f71b6c0
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri May 29 17:03:49 2020 +0100

    KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (#8739)
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../consumer/internals/SubscriptionState.java      | 12 ++++++-
 .../internals/ConsumerCoordinatorTest.java         | 42 ++++++++++++++++++++++
 .../consumer/internals/SubscriptionStateTest.java  |  6 ++++
 3 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index bd84777..d90b2ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -345,7 +345,17 @@ public class SubscriptionState {
      *   of the current generation; otherwise it returns the same set as {@link #subscription()}
      */
     synchronized Set<String> metadataTopics() {
-        return groupSubscription.isEmpty() ? subscription : groupSubscription;
+        if (groupSubscription.isEmpty())
+            return subscription;
+        else if (groupSubscription.containsAll(subscription))
+            return groupSubscription;
+        else {
+            // When subscription changes `groupSubscription` may be outdated, ensure that
+            // new subscription topics are returned.
+            Set<String> topics = new HashSet<>(groupSubscription);
+            topics.addAll(subscription);
+            return topics;
+        }
     }
 
     synchronized boolean needsMetadata(String topic) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index abf2938..0ac1e69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -646,6 +646,35 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testMetadataTopicsDuringSubscriptionChange() {
+        final String consumerId = "subscription_change";
+        final List<String> oldSubscription = singletonList(topic1);
+        final List<TopicPartition> oldAssignment = Collections.singletonList(t1p);
+        final List<String> newSubscription = singletonList(topic2);
+        final List<TopicPartition> newAssignment = Collections.singletonList(t2p);
+
+        subscriptions.subscribe(toSet(oldSubscription), rebalanceListener);
+        assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        prepareJoinAndSyncResponse(consumerId, 1, oldSubscription, oldAssignment);
+
+        coordinator.poll(time.timer(0));
+        assertEquals(toSet(oldSubscription), subscriptions.metadataTopics());
+
+        subscriptions.subscribe(toSet(newSubscription), rebalanceListener);
+        assertEquals(Utils.mkSet(topic1, topic2), subscriptions.metadataTopics());
+
+        prepareJoinAndSyncResponse(consumerId, 2, newSubscription, newAssignment);
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
+        assertEquals(toSet(newSubscription), subscriptions.metadataTopics());
+    }
+
+    @Test
     public void testPatternJoinGroupLeader() {
         final String consumerId = "leader";
         final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
@@ -2794,6 +2823,19 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors),
disconnected);
     }
 
+    private void prepareJoinAndSyncResponse(String consumerId, int generation, List<String>
subscription, List<TopicPartition> assignment) {
+        partitionAssignor.prepare(singletonMap(consumerId, assignment));
+        client.prepareResponse(
+                joinGroupLeaderResponse(
+                        generation, consumerId, singletonMap(consumerId, subscription), Errors.NONE));
+        client.prepareResponse(body -> {
+            SyncGroupRequest sync = (SyncGroupRequest) body;
+            return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == generation &&
+                    sync.groupAssignments().containsKey(consumerId);
+        }, syncGroupResponse(assignment, Errors.NONE));
+    }
+
     private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition>
partitions, Errors error) {
         final Map<TopicPartition, Errors> errors = new HashMap<>();
         for (TopicPartition partition : partitions) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index cc74652..ae00b8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -122,6 +122,12 @@ public class SubscriptionStateTest {
         // `groupSubscribe` does not accumulate
         assertFalse(state.groupSubscribe(singleton(topic1)));
         assertEquals(singleton(topic1), state.metadataTopics());
+
+        state.subscribe(singleton("anotherTopic"), rebalanceListener);
+        assertEquals(Utils.mkSet(topic1, "anotherTopic"), state.metadataTopics());
+
+        assertFalse(state.groupSubscribe(singleton("anotherTopic")));
+        assertEquals(singleton("anotherTopic"), state.metadataTopics());
     }
 
     @Test


Mime
View raw message