kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2381: Fix concurrent modification on assigned partition while looping over it; reviewed by Jason Gustafson, Aditya Auradkar, Ewen Cheslack-Postava, Ismael Juma and Guozhang Wang
Date Tue, 28 Jul 2015 21:23:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 57386de64 -> 269c2407d


KAFKA-2381: Fix concurrent modification on assigned partition while looping over it; reviewed
by Jason Gustafson, Aditya Auradkar, Ewen Cheslack-Postava, Ismael Juma and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/269c2407
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/269c2407
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/269c2407

Branch: refs/heads/trunk
Commit: 269c2407d4e6e38b3b6be00566c480121b5dc51a
Parents: 57386de
Author: Ashish Singh <asingh@cloudera.com>
Authored: Tue Jul 28 14:23:44 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 28 14:23:44 2015 -0700

----------------------------------------------------------------------
 .../consumer/internals/SubscriptionState.java   |  4 +++-
 .../internals/SubscriptionStateTest.java        | 21 ++++++++++++++++++++
 .../integration/kafka/api/ConsumerTest.scala    | 19 ++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/269c2407/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 4d9a425..8a2cb12 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -83,7 +84,8 @@ public class SubscriptionState {
             throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
         this.subscribedTopics.remove(topic);
         this.needsPartitionAssignment = true;
-        for (TopicPartition tp: assignedPartitions())
+        final List<TopicPartition> existingAssignedPartitions = new ArrayList<>(assignedPartitions());
+        for (TopicPartition tp: existingAssignedPartitions)
             if (topic.equals(tp.topic()))
                 clearPartition(tp);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/269c2407/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 319751c..c47f3fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -73,6 +73,27 @@ public class SubscriptionStateTest {
         assertAllPositions(tp0, null);
         assertEquals(Collections.singleton(tp1), state.assignedPartitions());
     }
+
+    @Test
+    public void topicUnsubscription() {
+        final String topic = "test";
+        state.subscribe(topic);
+        assertEquals(1, state.subscribedTopics().size());
+        assertTrue(state.assignedPartitions().isEmpty());
+        assertTrue(state.partitionsAutoAssigned());
+        state.changePartitionAssignment(asList(tp0));
+        state.committed(tp0, 1);
+        state.fetched(tp0, 1);
+        state.consumed(tp0, 1);
+        assertAllPositions(tp0, 1L);
+        state.changePartitionAssignment(asList(tp1));
+        assertAllPositions(tp0, null);
+        assertEquals(Collections.singleton(tp1), state.assignedPartitions());
+
+        state.unsubscribe(topic);
+        assertEquals(0, state.subscribedTopics().size());
+        assertTrue(state.assignedPartitions().isEmpty());
+    }
     
     @Test(expected = IllegalArgumentException.class)
     public void cantChangeFetchPositionForNonAssignedPartition() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/269c2407/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 3eb5f95..cca6e94 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -217,6 +217,25 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     consumer0.close()
   }
 
+  def testUnsubscribeTopic() {
+    val callback = new TestConsumerReassignmentCallback()
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); //
timeout quickly to avoid slow test
+    val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(),
new ByteArrayDeserializer())
+
+    try {
+      consumer0.subscribe(topic)
+
+      // the initial subscription should cause a callback execution
+      while (callback.callsToAssigned == 0)
+        consumer0.poll(50)
+
+      consumer0.unsubscribe(topic)
+      assertEquals(0, consumer0.subscriptions.size())
+    } finally {
+      consumer0.close()
+    }
+  }
+
   private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
     var callsToAssigned = 0
     var callsToRevoked = 0


Mime
View raw message