kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (#7441)
Date Tue, 29 Oct 2019 17:50:31 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 9dce3e7  KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe()
(#7441)
9dce3e7 is described below

commit 9dce3e75353f10dd9770e776c227f47bbe993230
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Tue Oct 29 10:41:25 2019 -0700

    KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe()
(#7441)
    
    Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks
and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment.
    
    However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe();
which means user's rebalance listener would never be triggered. In other words, from consumer
client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions
are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to
maintain their internal state, this leads to inconsistent state management and failure cases.
    
    Before KIP-429 this issue is hidden away since every time the consumer re-joins the group
later, it would still revoke everything anyways regardless of the passed-in parameters of
the rebalance listener; with KIP-429 this is easier to reproduce now.
    
    Our fixes are following:
    
    • Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe.
This we we are guaranteed that the streams' tasks are all closed as revoked by then.
    • [Optimization] If the generation is reset due to fatal error from join / hb response
etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked,
but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to
commit for lost tracks during rebalance which is doomed to fail as we don't have any generation
info.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>,
Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |   2 +-
 .../consumer/internals/ConsumerCoordinator.java    |   7 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 100 +++++++++++++++++----
 .../internals/ConsumerCoordinatorTest.java         |  30 ++++++-
 .../internals/StreamsRebalanceListener.java        |   6 +-
 .../streams/processor/internals/TaskManager.java   |  14 +++
 tests/kafkatest/services/streams.py                |  14 ++-
 .../tests/streams/streams_broker_bounce_test.py    |  13 +--
 8 files changed, 157 insertions(+), 29 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f12beaf..5a5cad8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1069,11 +1069,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         acquireAndEnsureOpen();
         try {
             fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
-            this.subscriptions.unsubscribe();
             if (this.coordinator != null) {
                 this.coordinator.onLeavePrepare();
                 this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
             }
+            this.subscriptions.unsubscribe();
             log.info("Unsubscribed all topics or patterns and assigned partitions");
         } finally {
             release();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bceb9b8..d5b3061 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -696,7 +696,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         Set<TopicPartition> droppedPartitions = new HashSet<>(subscriptions.assignedPartitions());
 
         if (subscriptions.partitionsAutoAssigned() && !droppedPartitions.isEmpty())
{
-            final Exception e = invokePartitionsRevoked(droppedPartitions);
+            final Exception e;
+            if (generation() != Generation.NO_GENERATION) {
+                e = invokePartitionsRevoked(droppedPartitions);
+            } else {
+                e = invokePartitionsLost(droppedPartitions);
+            }
 
             subscriptions.assignFromSubscribed(Collections.emptySet());
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 68fac2b..e939d96 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -150,6 +150,12 @@ public class KafkaConsumerTest {
     private final String groupId = "mock-group";
     private final Optional<String> groupInstanceId = Optional.of("mock-instance");
 
+    private final String partitionRevoked = "Hit partition revoke ";
+    private final String partitionAssigned = "Hit partition assign ";
+    private final String partitionLost = "Hit partition lost ";
+
+    private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new
TopicPartition(topic, 0));
+
     @Test
     public void testMetricsReporterAutoGeneratedClientId() {
         Properties props = new Properties();
@@ -409,7 +415,7 @@ public class KafkaConsumerTest {
 
         assertEquals(singleton(tp0), consumer.assignment());
 
-        AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
+        AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator, Errors.NONE);
 
         // heartbeat interval is 2 seconds
         time.sleep(heartbeatIntervalMs);
@@ -444,7 +450,7 @@ public class KafkaConsumerTest {
         client.poll(0, time.milliseconds());
 
         client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
-        AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
+        AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator, Errors.NONE);
 
         time.sleep(heartbeatIntervalMs);
         Thread.sleep(heartbeatIntervalMs);
@@ -661,7 +667,6 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
-        Node node = metadata.fetch().nodes().get(0);
 
         ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
@@ -1066,12 +1071,7 @@ public class KafkaConsumerTest {
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription,
metadata, assignor, false, groupInstanceId);
 
-        // initial subscription
-        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
-
-        // verify that subscription has changed but assignment is still unchanged
-        assertEquals(singleton(topic), consumer.subscription());
-        assertEquals(Collections.emptySet(), consumer.assignment());
+        initializeSubscriptionWithSingleTopic(consumer, getConsumerRebalanceListener(consumer));
 
         // mock rebalance responses
         prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1112,6 +1112,71 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
+        CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription,
metadata, assignor, false, groupInstanceId);
+
+        initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
+
+        prepareRebalance(client, node, assignor, singletonList(tp0), null);
+
+        RuntimeException assignmentException = assertThrows(RuntimeException.class,
+            () -> consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
+        assertEquals(partitionAssigned + singleTopicPartition, assignmentException.getCause().getMessage());
+
+        RuntimeException unsubscribeException = assertThrows(RuntimeException.class, consumer::unsubscribe);
+        assertEquals(partitionRevoked + singleTopicPartition, unsubscribeException.getCause().getMessage());
+    }
+
+    @Test
+    public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception
{
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
+        CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription,
metadata, assignor, false, groupInstanceId);
+
+        initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
+        Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
+
+        RuntimeException assignException = assertThrows(RuntimeException.class,
+            () -> consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
+        assertEquals(partitionAssigned + singleTopicPartition, assignException.getCause().getMessage());
+
+        AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator, Errors.UNKNOWN_MEMBER_ID);
+
+        time.sleep(heartbeatIntervalMs);
+        TestUtils.waitForCondition(heartbeatReceived::get, "Heartbeat response did not occur
within timeout.");
+
+        consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
+        assertTrue(heartbeatReceived.get());
+
+        RuntimeException unsubscribeException = assertThrows(RuntimeException.class, consumer::unsubscribe);
+        assertEquals(partitionLost + singleTopicPartition, unsubscribeException.getCause().getMessage());
+    }
+
+    private void initializeSubscriptionWithSingleTopic(KafkaConsumer<String, String>
consumer,
+                                                       ConsumerRebalanceListener consumerRebalanceListener)
{
+        consumer.subscribe(singleton(topic), consumerRebalanceListener);
+        // verify that subscription has changed but assignment is still unchanged
+        assertEquals(singleton(topic), consumer.subscription());
+        assertEquals(Collections.emptySet(), consumer.assignment());
+    }
+
+    @Test
     public void testManualAssignmentChangeWithAutoCommitEnabled() {
         Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
@@ -1666,7 +1731,7 @@ public class KafkaConsumerTest {
             consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
             fail("Should throw exception");
         } catch (Throwable e) {
-            assertEquals("boom!", e.getCause().getMessage());
+            assertEquals(partitionAssigned + singleTopicPartition, e.getCause().getMessage());
         }
 
         // the assignment is still updated regardless of the exception
@@ -1677,7 +1742,7 @@ public class KafkaConsumerTest {
             consumer.close(Duration.ofMillis(0));
             fail("Should throw exception");
         } catch (Throwable e) {
-            assertEquals("boom!", e.getCause().getCause().getMessage());
+            assertEquals(partitionRevoked + singleTopicPartition, e.getCause().getCause().getMessage());
         }
 
         consumer.close(Duration.ofMillis(0));
@@ -1720,12 +1785,17 @@ public class KafkaConsumerTest {
         return new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
-                throw new RuntimeException("boom!");
+                throw new RuntimeException(partitionRevoked + partitions);
             }
 
             @Override
             public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
-                throw new RuntimeException("boom!");
+                throw new RuntimeException(partitionAssigned + partitions);
+            }
+
+            @Override
+            public void onPartitionsLost(Collection<TopicPartition> partitions) {
+                throw new RuntimeException(partitionLost + partitions);
             }
         };
     }
@@ -1779,7 +1849,7 @@ public class KafkaConsumerTest {
         return coordinator;
     }
 
-    private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node coordinator) {
+    private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node coordinator, Errors
error) {
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
             @Override
@@ -1787,7 +1857,7 @@ public class KafkaConsumerTest {
                 heartbeatReceived.set(true);
                 return true;
             }
-        }, new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code())),
coordinator);
+        }, new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code())),
coordinator);
         return heartbeatReceived;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fd3411b..5ff9761 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -454,6 +454,34 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testUnsubscribeWithValidGeneration() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
+            new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new
byte[0])));
+        coordinator.onJoinComplete(1, "memberId", partitionAssignor.name(), buffer);
+
+        coordinator.onLeavePrepare();
+        assertEquals(1, rebalanceListener.lostCount);
+        assertEquals(0, rebalanceListener.revokedCount);
+    }
+
+    @Test
+    public void testUnsubscribeWithInvalidGeneration() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        subscriptions.assignFromSubscribed(Collections.singletonList(t1p));
+
+        coordinator.onLeavePrepare();
+        assertEquals(1, rebalanceListener.lostCount);
+        assertEquals(0, rebalanceListener.revokedCount);
+    }
+
+    @Test
     public void testUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -2300,7 +2328,7 @@ public class ConsumerCoordinatorTest {
 
             MockTime time = new MockTime(1);
 
-            //onJoinPrepare will be executed and onJoinComplete will not.
+            // onJoinPrepare will be executed and onJoinComplete will not.
             boolean res = coordinator.joinGroupIfNeeded(time.timer(2));
 
             assertFalse(res);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index 3adac44..f2c75b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -36,9 +36,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener
{
     private final Logger log;
 
     StreamsRebalanceListener(final Time time,
-        final TaskManager taskManager,
-        final StreamThread streamThread,
-        final Logger log) {
+                             final TaskManager taskManager,
+                             final StreamThread streamThread,
+                             final Logger log) {
         this.time = time;
         this.taskManager = taskManager;
         this.streamThread = streamThread;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a9ccbf5..82496df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -466,6 +466,20 @@ public class TaskManager {
             }
         }
 
+        log.debug("Assigning metadata with: " +
+                      "\tactiveTasks: {},\n" +
+                      "\tstandbyTasks: {}\n" +
+                      "The updated active task states are: \n" +
+                      "\tassignedActiveTasks {},\n" +
+                      "\tassignedStandbyTasks {},\n" +
+                      "\taddedActiveTasks {},\n" +
+                      "\taddedStandbyTasks {},\n" +
+                      "\trevokedActiveTasks {},\n" +
+                      "\trevokedStandbyTasks {}",
+                  activeTasks, standbyTasks,
+                  assignedActiveTasks, assignedStandbyTasks,
+                  addedActiveTasks, addedStandbyTasks,
+                  revokedActiveTasks, revokedStandbyTasks);
         this.assignedActiveTasks = activeTasks;
         this.assignedStandbyTasks = standbyTasks;
     }
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 52afe4e..06686cf 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -303,12 +303,20 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command):
+    def __init__(self, test_context, kafka, command, num_threads = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
+        self.NUM_THREADS = num_threads
 
+    def prop_file(self):
+        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                      streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
+                      streams_property.NUM_THREADS: self.NUM_THREADS}
+
+        cfg = KafkaConfig(**properties)
+        return cfg.render()
 
 class StreamsEosTestBaseService(StreamsTestBaseService):
     """Base class for Streams EOS Test services providing some common settings and functionality"""
@@ -352,8 +360,8 @@ class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
         return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
-    def __init__(self, test_context, kafka):
-        super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process")
+    def __init__(self, test_context, kafka, num_threads = 3):
+        super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process",
num_threads)
 
 class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 3d6572d..974d450 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -150,7 +150,7 @@ class StreamsBrokerBounceTest(Test):
         return True
 
         
-    def setup_system(self, start_processor=True):
+    def setup_system(self, start_processor=True, num_threads=3):
         # Setup phase
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
         self.zk.start()
@@ -165,7 +165,7 @@ class StreamsBrokerBounceTest(Test):
 
         # Start test harness
         self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka,
num_threads)
 
         self.driver.start()
 
@@ -208,13 +208,16 @@ class StreamsBrokerBounceTest(Test):
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
             broker_type=["leader", "controller"],
+            num_threads=[1, 3],
             sleep_time_secs=[120])
-    def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs):
+    def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads):
         """
         Start a smoke test client, then kill one particular broker and ensure data is still
received
-        Record if records are delivered. 
+        Record if records are delivered.
+        We also add a single thread stream client to make sure we could get all partitions
reassigned in
+        next generation so to verify the partition lost is correctly triggered.
         """
-        self.setup_system() 
+        self.setup_system(num_threads=num_threads)
 
         # Sleep to allow test to run for a bit
         time.sleep(sleep_time_secs)


Mime
View raw message