kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-4950; Fix ConcurrentModificationException on assigned-partitions metric update (#3907)
Date Wed, 08 Aug 2018 22:59:03 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new f5b0e80  KAFKA-4950; Fix ConcurrentModificationException on assigned-partitions metric
update (#3907)
f5b0e80 is described below

commit f5b0e8079a8a3576a0a90affa11aa316097b4793
Author: S├ębastien Launay <sebastienlaunay@gmail.com>
AuthorDate: Wed Aug 8 15:43:43 2018 -0700

    KAFKA-4950; Fix ConcurrentModificationException on assigned-partitions metric update (#3907)
    
    Use a volatile field to track the size of the set of assigned partitions to avoid the
concurrent access to the underlying linked hash map.
    
    Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    |  3 +-
 .../consumer/internals/SubscriptionState.java      |  8 +++
 .../kafka/common/internals/PartitionStates.java    | 19 ++++++-
 .../internals/ConsumerCoordinatorTest.java         | 62 +++++++++++++++++++++-
 .../consumer/internals/SubscriptionStateTest.java  | 23 ++++++++
 5 files changed, 111 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c96..f9737a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -882,7 +882,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             Measurable numParts =
                 new Measurable() {
                     public double measure(MetricConfig config, long now) {
-                        return subscriptions.assignedPartitions().size();
+                        // Get the number of assigned partitions in a thread safe manner
+                        return subscriptions.numAssignedPartitions();
                     }
                 };
             metrics.addMetric(metrics.metricName("assigned-partitions",
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index a81cbcb..85845d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -268,6 +268,14 @@ public class SubscriptionState {
         return this.assignment.partitionSet();
     }
 
+    /**
+     * Provides the number of assigned partitions in a thread safe manner.
+     * @return the number of assigned partitions.
+     */
+    public int numAssignedPartitions() {
+        return this.assignment.size();
+    }
+
     public List<TopicPartition> fetchablePartitions() {
         List<TopicPartition> fetchable = new ArrayList<>(assignment.size());
         for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
index 605372c..5b904c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -36,11 +36,17 @@ import java.util.Set;
  * topic would "wrap around" and appear twice. However, as partitions are fetched in different
orders and partition
  * leadership changes, we will deviate from the optimal. If this turns out to be an issue
in practice, we can improve
  * it by tracking the partitions per node or calling `set` every so often.
+ *
+ * Note that this class is not thread-safe with the exception of {@link #size()} which returns
the number of
+ * partitions currently tracked.
  */
 public class PartitionStates<S> {
 
     private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
 
+    /* the number of partitions that are currently assigned available in a thread safe manner
*/
+    private volatile int size = 0;
+
     public PartitionStates() {}
 
     public void moveToEnd(TopicPartition topicPartition) {
@@ -52,10 +58,12 @@ public class PartitionStates<S> {
     public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {
         map.remove(topicPartition);
         map.put(topicPartition, state);
+        updateSize();
     }
 
     public void remove(TopicPartition topicPartition) {
         map.remove(topicPartition);
+        updateSize();
     }
 
     /**
@@ -67,6 +75,7 @@ public class PartitionStates<S> {
 
     public void clear() {
         map.clear();
+        updateSize();
     }
 
     public boolean contains(TopicPartition topicPartition) {
@@ -95,8 +104,11 @@ public class PartitionStates<S> {
         return map.get(topicPartition);
     }
 
+    /**
+     * Get the number of partitions that are currently being tracked. This is thread-safe.
+     */
     public int size() {
-        return map.size();
+        return size;
     }
 
     /**
@@ -108,6 +120,11 @@ public class PartitionStates<S> {
     public void set(Map<TopicPartition, S> partitionToState) {
         map.clear();
         update(partitionToState);
+        updateSize();
+    }
+
+    private void updateSize() {
+        size = map.size();
     }
 
     private void update(Map<TopicPartition, S> partitionToState) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 6c5d174..c683a82 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
@@ -54,6 +56,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -76,6 +79,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
@@ -435,7 +439,7 @@ public class ConsumerCoordinatorTest {
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
-        assertEquals(2, subscriptions.assignedPartitions().size());
+        assertEquals(2, subscriptions.numAssignedPartitions());
         assertEquals(2, subscriptions.groupSubscription().size());
         assertEquals(2, subscriptions.subscription().size());
         assertEquals(1, rebalanceListener.revokedCount);
@@ -607,7 +611,7 @@ public class ConsumerCoordinatorTest {
         coordinator.joinGroupIfNeeded();
 
         assertFalse(coordinator.needRejoin());
-        assertEquals(2, subscriptions.assignedPartitions().size());
+        assertEquals(2, subscriptions.numAssignedPartitions());
         assertEquals(2, subscriptions.subscription().size());
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -1593,6 +1597,60 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testThreadSafeAssignedPartitionsMetric() throws Exception {
+        // Get the assigned-partitions metric
+        final Metric metric = metrics.metric(new MetricName("assigned-partitions", "consumer"
+ groupId + "-coordinator-metrics",
+                "", Collections.<String, String>emptyMap()));
+
+        // Start polling the metric in the background
+        final AtomicBoolean doStop = new AtomicBoolean();
+        final AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
+        final AtomicInteger observedSize = new AtomicInteger();
+
+        Thread poller = new Thread() {
+            @Override
+            public void run() {
+                // Poll as fast as possible to reproduce ConcurrentModificationException
+                while (!doStop.get()) {
+                    try {
+                        int size = ((Double) metric.metricValue()).intValue();
+                        observedSize.set(size);
+                    } catch (Exception e) {
+                        exceptionHolder.set(e);
+                        return;
+                    }
+                }
+            }
+        };
+        poller.start();
+
+        // Assign two partitions to trigger a metric change that can lead to ConcurrentModificationException
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+
+        // Change the assignment several times to increase likelihood of concurrent updates
+        Set<TopicPartition> partitions = new HashSet<>();
+        final int totalPartitions = 10;
+        for (int partition = 0; partition < totalPartitions; partition++) {
+            partitions.add(new TopicPartition(topic1, partition));
+            subscriptions.assignFromUser(partitions);
+        }
+
+        // Wait for the metric poller to observe the final assignment change or raise an
error
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return observedSize.get() == totalPartitions || exceptionHolder.get() !=
null;
+            }
+        }, "Failed to observe expected assignment change");
+
+        doStop.set(true);
+        poller.join();
+
+        assertNull("Failed fetching the metric at least once", exceptionHolder.get());
+    }
+
+    @Test
     public void testCloseDynamicAssignment() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
         gracefulCloseTest(coordinator, true);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 24255e8..05287e0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -49,12 +49,14 @@ public class SubscriptionStateTest {
     public void partitionAssignment() {
         state.assignFromUser(singleton(tp0));
         assertEquals(singleton(tp0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
         assertFalse(state.hasAllFetchPositions());
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertEquals(1L, state.position(tp0).longValue());
         state.assignFromUser(Collections.<TopicPartition>emptySet());
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp0));
     }
@@ -64,28 +66,34 @@ public class SubscriptionStateTest {
         state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
         // assigned partitions should immediately change
         assertEquals(2, state.assignedPartitions().size());
+        assertEquals(2, state.numAssignedPartitions());
         assertTrue(state.assignedPartitions().contains(tp0));
         assertTrue(state.assignedPartitions().contains(tp1));
 
         state.unsubscribe();
         // assigned partitions should immediately change
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
 
         state.subscribe(singleton(topic1), rebalanceListener);
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
 
         state.assignFromSubscribed(singleton(t1p0));
         // assigned partitions should immediately change
         assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
 
         state.subscribe(singleton(topic), rebalanceListener);
         // assigned partitions should remain unchanged
         assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
 
         state.unsubscribe();
         // assigned partitions should immediately change
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
     }
 
     @Test
@@ -93,37 +101,45 @@ public class SubscriptionStateTest {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
 
         state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
         // assigned partitions should remain unchanged
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
 
         state.assignFromSubscribed(singleton(tp1));
         // assigned partitions should immediately change
         assertEquals(singleton(tp1), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
         assertEquals(singleton(topic), state.subscription());
 
         state.assignFromSubscribed(Collections.singletonList(t1p0));
         // assigned partitions should immediately change
         assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
         assertEquals(singleton(topic), state.subscription());
 
         state.subscribe(Pattern.compile(".*t"), rebalanceListener);
         // assigned partitions should remain unchanged
         assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
 
         state.subscribeFromPattern(singleton(topic));
         // assigned partitions should remain unchanged
         assertEquals(singleton(t1p0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
 
         state.assignFromSubscribed(Collections.singletonList(tp0));
         // assigned partitions should immediately change
         assertEquals(singleton(tp0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
         assertEquals(singleton(topic), state.subscription());
 
         state.unsubscribe();
         // assigned partitions should immediately change
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
     }
 
     @Test
@@ -169,6 +185,7 @@ public class SubscriptionStateTest {
         state.subscribe(singleton(topic), rebalanceListener);
         assertEquals(1, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
         assertTrue(state.partitionsAutoAssigned());
         state.assignFromSubscribed(singleton(tp0));
         state.seek(tp0, 1);
@@ -178,6 +195,7 @@ public class SubscriptionStateTest {
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp1));
         assertEquals(singleton(tp1), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
     }
 
     @Test
@@ -261,6 +279,7 @@ public class SubscriptionStateTest {
         state.unsubscribe();
         state.assignFromUser(singleton(tp0));
         assertEquals(singleton(tp0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
     }
 
     @Test
@@ -269,17 +288,21 @@ public class SubscriptionStateTest {
         state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1)));
         state.assignFromSubscribed(singleton(tp1));
         assertEquals(singleton(tp1), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
 
         state.unsubscribe();
         assertEquals(0, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
 
         state.assignFromUser(singleton(tp0));
         assertEquals(singleton(tp0), state.assignedPartitions());
+        assertEquals(1, state.numAssignedPartitions());
 
         state.unsubscribe();
         assertEquals(0, state.subscription().size());
         assertTrue(state.assignedPartitions().isEmpty());
+        assertEquals(0, state.numAssignedPartitions());
     }
 
     private static class MockRebalanceListener implements ConsumerRebalanceListener {


Mime
View raw message