kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7126; Reduce number of rebalance for large consumer group after a topic is created
Date Thu, 26 Jul 2018 17:31:15 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b0de9ba  KAFKA-7126; Reduce number of rebalance for large consumer group after a
topic is created
b0de9ba is described below

commit b0de9bad4af60ca9df98974f513579091f431959
Author: Jon Lee <jonlee@linkedin.com>
AuthorDate: Thu Jul 26 10:28:07 2018 -0700

    KAFKA-7126; Reduce number of rebalance for large consumer group after a topic is created
    
    This patch forces metadata update for consumers with pattern subscription at the beginning
of rebalance (retry.backoff.ms is respected). This is to prevent such consumers from detecting
subscription changes (e.g., new topic creation) independently and triggering multiple unnecessary
rebalances. KAFKA-7126 contains detailed scenarios and rationale.
    
    Author: Jon Lee <jonlee@linkedin.com>
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Ted Yu <yuzhihong@gmail.com>,
Dong Lin <lindong28@gmail.com>
    
    Closes #5408 from jonlee2/KAFKA-7126
    
    (cherry picked from commit a932520135d42c7d9731064d96c21ab2fc5de696)
    Signed-off-by: Dong Lin <lindong28@gmail.com>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 16 +++++++-
 .../consumer/internals/ConsumerCoordinator.java    | 10 +++++
 .../internals/ConsumerCoordinatorTest.java         | 44 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ec07f13..17d9839 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -125,14 +125,26 @@ public final class Metadata implements Closeable {
     }
 
     /**
+     * Return the next time when the current cluster info can be updated (i.e., backoff time
has elapsed).
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till the cluster info can be updated again
+     */
+    public synchronized long timeToAllowUpdate(long nowMs) {
+        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+    }
+
+    /**
      * The next time to update the cluster info is the maximum of the time the current info
will expire and the time the
      * current info can be updated (i.e. backoff time has elapsed); If an update has been
request then the expiry time
      * is now
+     *
+     * @param nowMs current time in ms
+     * @return remaining time in ms till updating the cluster info
      */
     public synchronized long timeToNextUpdate(long nowMs) {
         long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs
- nowMs, 0);
-        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
-        return Math.max(timeToExpire, timeToAllowUpdate);
+        return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index e04cdeb..51ae58e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -323,6 +323,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 // we need to ensure that the metadata is fresh before joining initially.
This ensures
                 // that we have matched the pattern against the cluster's topics at least
once before joining.
                 if (subscriptions.hasPatternSubscription()) {
+                    // For consumer group that uses pattern-based subscription, after a topic
is created,
+                    // any consumer that discovers the topic after metadata refresh can trigger
rebalance
+                    // across the entire consumer group. Multiple rebalances can be triggered
after one topic
+                    // creation if consumers refresh metadata at vastly different times.
We can significantly
+                    // reduce the number of rebalances caused by single topic creation by
asking consumer to
+                    // refresh metadata before re-joining the group as long as the refresh
backoff time has
+                    // passed.
+                    if (this.metadata.timeToAllowUpdate(currentTime) == 0) {
+                        this.metadata.requestUpdate();
+                    }
                     if (!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed)))
{
                         return false;
                     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ba392c6..cec56b0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -514,6 +514,50 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
+        // Set up a non-leader consumer with pattern subscription and a cluster containing
one topic matching the
+        // pattern.
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(),
+            time.milliseconds());
+        assertEquals(singleton(topic1), subscriptions.subscription());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // Instrument the test so that metadata will contain two topics after next refresh.
+        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                SyncGroupRequest sync = (SyncGroupRequest) body;
+                return sync.memberId().equals(consumerId) &&
+                    sync.generationId() == 1 &&
+                    sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+
+        // This will trigger rebalance.
+        coordinator.poll(Long.MAX_VALUE);
+
+        // Make sure that the metadata was refreshed during the rebalance and thus subscriptions
now contain two topics.
+        final Set<String> updatedSubscriptionSet = new HashSet<>(Arrays.asList(topic1,
topic2));
+        assertEquals(updatedSubscriptionSet, subscriptions.subscription());
+
+        // Refresh the metadata again. Since there have been no changes since the last refresh,
it won't trigger
+        // rebalance again.
+        metadata.requestUpdate();
+        client.poll(Long.MAX_VALUE, time.milliseconds());
+        assertFalse(coordinator.rejoinNeededOrPending());
+    }
+
+    @Test
     public void testWakeupDuringJoin() {
         final String consumerId = "leader";
 


Mime
View raw message