kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8011: Fix for race condition causing concurrent modification exception (#6338)
Date Thu, 28 Feb 2019 23:58:28 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 1b522a1  KAFKA-8011: Fix for race condition causing concurrent modification exception
(#6338)
1b522a1 is described below

commit 1b522a1d739e7bcafd0c134f31ff6268f0437f25
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Feb 28 18:54:25 2019 -0500

    KAFKA-8011: Fix for race condition causing concurrent modification exception (#6338)
    
    In the RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated() and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted()
a race condition exists where the ConsumerRebalanceListener in the test modifies the list
of subscribed topics when the condition for the test success is comparing the same array instance
against expected values.
    
    This PR should fix this race condition by using a CopyOnWriteArrayList which guarantees
safe traversal of the list even when a concurrent modification is happening.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../apache/kafka/streams/integration/RegexSourceIntegrationTest.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 2873593..fc7a14c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -60,6 +60,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
@@ -152,7 +153,7 @@ public class RegexSourceIntegrationTest {
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
         pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
-        final List<String> assignedTopics = new ArrayList<>();
+        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
         streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
@@ -190,7 +191,7 @@ public class RegexSourceIntegrationTest {
 
         pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
 
-        final List<String> assignedTopics = new ArrayList<>();
+        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
         streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier()
{
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {


Mime
View raw message