kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove SubscriptionState.Listener and replace with assignmentId tracking (#6559)
Date Thu, 11 Apr 2019 18:55:31 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02221bd  MINOR: Remove SubscriptionState.Listener and replace with assignmentId tracking
(#6559)
02221bd is described below

commit 02221bd907a23041c95ce6446986bff631652b3a
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Apr 11 11:55:14 2019 -0700

    MINOR: Remove SubscriptionState.Listener and replace with assignmentId tracking (#6559)
    
    We have not had great experience with listeners. They make the code harder to understand
because they result in indirectly maintained circular dependencies. Often this leads to tricky
deadlocks when we try to introduce locking. We were able to remove the Metadata listener in
KAFKA-7831. Here we do the same for the listener in SubscriptionState.
    
    Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 26 +++++++-------
 .../consumer/internals/SubscriptionState.java      | 41 ++++++++--------------
 .../clients/consumer/internals/FetcherTest.java    |  3 ++
 .../consumer/internals/SubscriptionStateTest.java  | 20 +++++------
 4 files changed, 39 insertions(+), 51 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 64bc921..93d476f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -110,7 +110,7 @@ import static java.util.Collections.emptyList;
  *     synchronized on the response future.</li>
  * </ul>
  */
-public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
+public class Fetcher<K, V> implements Closeable {
     private final Logger log;
     private final LogContext logContext;
     private final ConsumerNetworkClient client;
@@ -174,8 +174,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         this.requestTimeoutMs = requestTimeoutMs;
         this.isolationLevel = isolationLevel;
         this.sessionHandlers = new HashMap<>();
-
-        subscriptions.addListener(this);
     }
 
     /**
@@ -207,6 +205,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
      * @return number of fetches sent
      */
     public synchronized int sendFetches() {
+        // Update metrics in case there was an assignment change
+        sensors.maybeUpdateAssignment(subscriptions);
+
         Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet())
{
             final Node fetchTarget = entry.getKey();
@@ -1076,11 +1077,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() :
Optional.of(leaderEpoch);
     }
 
-    @Override
-    public void onAssignment(Set<TopicPartition> assignment) {
-        sensors.updatePartitionLagAndLeadSensors(assignment);
-    }
-
     /**
      * Clear the buffered data which are not a part of newly assigned partitions
      *
@@ -1428,7 +1424,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         private final Sensor recordsFetchLag;
         private final Sensor recordsFetchLead;
 
-        private Set<TopicPartition> assignedPartitions;
+        private int assignmentId = 0;
+        private Set<TopicPartition> assignedPartitions = Collections.emptySet();
 
         private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry)
{
             this.metrics = metrics;
@@ -1491,16 +1488,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
             recordsFetched.record(records);
         }
 
-        private void updatePartitionLagAndLeadSensors(Set<TopicPartition> assignedPartitions)
{
-            if (this.assignedPartitions != null) {
+        private void maybeUpdateAssignment(SubscriptionState subscription) {
+            int newAssignmentId = subscription.assignmentId();
+            if (this.assignmentId != newAssignmentId) {
+                Set<TopicPartition> newAssignedPartitions = new HashSet<>(subscription.assignedPartitions());
                 for (TopicPartition tp : this.assignedPartitions) {
-                    if (!assignedPartitions.contains(tp)) {
+                    if (!newAssignedPartitions.contains(tp)) {
                         metrics.removeSensor(partitionLagMetricName(tp));
                         metrics.removeSensor(partitionLeadMetricName(tp));
                     }
                 }
+                this.assignedPartitions = newAssignedPartitions;
+                this.assignmentId = newAssignmentId;
             }
-            this.assignedPartitions = assignedPartitions;
         }
 
         private void recordPartitionLead(TopicPartition tp, long lead) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index fe944c5..c28ac87 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -91,12 +90,11 @@ public class SubscriptionState {
     /* Default offset reset strategy */
     private final OffsetResetStrategy defaultResetStrategy;
 
-    /* Listeners provide a hook for internal state cleanup (e.g. metrics) on assignment changes
*/
-    private final List<Listener> listeners = new ArrayList<>();
-
     /* User-provided listener to be invoked when assignment changes */
     private ConsumerRebalanceListener rebalanceListener;
 
+    private int assignmentId = 0;
+
     public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy)
{
         this.log = logContext.logger(this.getClass());
         this.defaultResetStrategy = defaultResetStrategy;
@@ -108,6 +106,16 @@ public class SubscriptionState {
     }
 
     /**
+     * Monotonically increasing id which is incremented after every assignment change. This
can
+     * be used to check when an assignment has changed.
+     *
+     * @return The current assignment Id
+     */
+    public int assignmentId() {
+        return assignmentId;
+    }
+
+    /**
      * This method sets the subscription type if it is not already set (i.e. when it is NONE),
      * or verifies that the subscription type is equal to the give type when it is set (i.e.
      * when it is not NONE)
@@ -177,7 +185,7 @@ public class SubscriptionState {
         if (this.assignment.partitionSet().equals(partitions))
             return false;
 
-        fireOnAssignment(partitions);
+        assignmentId++;
 
         Set<String> manualSubscribedTopics = new HashSet<>();
         Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
@@ -225,8 +233,7 @@ public class SubscriptionState {
         if (assignmentMatchedSubscription) {
             Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(
                     assignments);
-            fireOnAssignment(assignedPartitionStates.keySet());
-
+            assignmentId++;
             this.assignment.set(assignedPartitionStates);
         }
 
@@ -261,7 +268,7 @@ public class SubscriptionState {
         this.assignment.clear();
         this.subscribedPattern = null;
         this.subscriptionType = SubscriptionType.NONE;
-        fireOnAssignment(Collections.emptySet());
+        this.assignmentId++;
     }
 
     /**
@@ -488,15 +495,6 @@ public class SubscriptionState {
         return rebalanceListener;
     }
 
-    public void addListener(Listener listener) {
-        listeners.add(listener);
-    }
-
-    public void fireOnAssignment(Set<TopicPartition> assignment) {
-        for (Listener listener : listeners)
-            listener.onAssignment(assignment);
-    }
-
     private static Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition>
assignments) {
         Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());
         for (TopicPartition tp : assignments)
@@ -583,14 +581,5 @@ public class SubscriptionState {
 
     }
 
-    public interface Listener {
-        /**
-         * Fired after a new assignment is received (after a group rebalance or when the
user manually changes the
-         * assignment).
-         *
-         * @param assignment The topic partitions assigned to the consumer
-         */
-        void onAssignment(Set<TopicPartition> assignment);
-    }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7c6ae6e..68a467f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1825,6 +1825,7 @@ public class FetcherTest {
 
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
+        fetcher.sendFetches();
         assertFalse(allMetrics.containsKey(partitionLagMetric));
     }
 
@@ -1866,6 +1867,7 @@ public class FetcherTest {
 
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
+        fetcher.sendFetches();
         assertFalse(allMetrics.containsKey(partitionLeadMetric));
     }
 
@@ -1908,6 +1910,7 @@ public class FetcherTest {
 
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
+        fetcher.sendFetches();
         assertFalse(allMetrics.containsKey(partitionLagMetric));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 8f8e960..5d4d113 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -28,7 +28,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
@@ -146,25 +145,22 @@ public class SubscriptionStateTest {
     }
 
     @Test
-    public void verifyAssignmentListener() {
-        final AtomicReference<Set<TopicPartition>> assignmentRef = new AtomicReference<>();
-        state.addListener(new SubscriptionState.Listener() {
-            @Override
-            public void onAssignment(Set<TopicPartition> assignment) {
-                assignmentRef.set(assignment);
-            }
-        });
+    public void verifyAssignmentId() {
+        assertEquals(0, state.assignmentId());
         Set<TopicPartition> userAssignment = Utils.mkSet(tp0, tp1);
         state.assignFromUser(userAssignment);
-        assertEquals(userAssignment, assignmentRef.get());
+        assertEquals(1, state.assignmentId());
+        assertEquals(userAssignment, state.assignedPartitions());
 
         state.unsubscribe();
-        assertEquals(Collections.emptySet(), assignmentRef.get());
+        assertEquals(2, state.assignmentId());
+        assertEquals(Collections.emptySet(), state.assignedPartitions());
 
         Set<TopicPartition> autoAssignment = Utils.mkSet(t1p0);
         state.subscribe(singleton(topic1), rebalanceListener);
         assertTrue(state.assignFromSubscribed(autoAssignment));
-        assertEquals(autoAssignment, assignmentRef.get());
+        assertEquals(3, state.assignmentId());
+        assertEquals(autoAssignment, state.assignedPartitions());
     }
 
     @Test


Mime
View raw message