kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9645: Fallback to unsubscribe during Task Migrated (#8220)
Date Sat, 07 Mar 2020 16:09:01 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe0b704  KAFKA-9645: Fallback to unsubscribe during Task Migrated (#8220)
fe0b704 is described below

commit fe0b704285ebc916ce5080a5248d91b4dc3c60e0
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Sat Mar 7 08:08:23 2020 -0800

    KAFKA-9645: Fallback to unsubscribe during Task Migrated (#8220)
    
    After #7312, we could still return data during the rebalance phase, which means it could
be possible to find records without corresponding tasks. We have to fallback to the unsubscribe
mode during task migrated as the assignment should be cleared out to keep sync with task manager
state.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/consumer/MockConsumer.java       | 24 ++++-----
 .../streams/processor/internals/StreamThread.java  |  5 +-
 .../processor/internals/StreamThreadTest.java      | 61 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 14 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index b8579c4..33702c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -66,6 +66,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private KafkaException offsetsException;
     private AtomicBoolean wakeup;
     private boolean closed;
+    private boolean shouldRebalance;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
         this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
@@ -79,6 +80,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.pollException = null;
         this.wakeup = new AtomicBoolean(false);
         this.committed = new HashMap<>();
+        this.shouldRebalance = false;
     }
 
     @Override
@@ -356,21 +358,10 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
         subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST);
     }
 
-    // needed for cases where you make a second call to endOffsets
-    public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets)
{
-        innerUpdateEndOffsets(newOffsets, false);
-    }
-
     public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets)
{
-        innerUpdateEndOffsets(newOffsets, true);
-    }
-
-    private void innerUpdateEndOffsets(final Map<TopicPartition, Long> newOffsets,
-                                       final boolean replace) {
-
         for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
             List<Long> offsets = endOffsets.get(entry.getKey());
-            if (replace || offsets == null) {
+            if (offsets == null) {
                 offsets = new ArrayList<>();
             }
             offsets.add(entry.getValue());
@@ -568,6 +559,15 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
 
     @Override
     public void enforceRebalance() {
+        shouldRebalance = true;
+    }
+
+    public boolean shouldRebalance() {
+        return shouldRebalance;
+    }
+
+    public void resetShouldRebalance() {
+        shouldRebalance = false;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d67fff8..7b6ac83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -508,11 +508,12 @@ public class StreamThread extends Thread {
                     "Will close out all assigned tasks and rejoin the consumer group.");
 
                 taskManager.handleLostAll();
-                mainConsumer.enforceRebalance();
+                mainConsumer.unsubscribe();
+                subscribeConsumer();
             }
         }
     }
-    
+
     private void subscribeConsumer() {
         if (builder.usesPatternSubscription()) {
             mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 13a669d..1bb4c9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -808,6 +809,66 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldNotReturnDataAfterTaskMigrated() {
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+
+        internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class);
+
+        EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2);
+
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        consumer.subscribe(Collections.singletonList(topic1), new MockRebalanceListener());
+        consumer.rebalance(Collections.singletonList(t1p1));
+        consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L));
+        consumer.seekToEnd(Collections.singletonList(t1p1));
+
+        final ChangelogReader changelogReader = new MockChangelogReader() {
+
+            @Override
+            public void restore() {
+                consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0],
new byte[0]));
+                consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1],
new byte[0]));
+
+                throw new TaskMigratedException(
+                    "Changelog restore found task migrated", new RuntimeException("restore
task migrated"));
+            }
+        };
+
+        taskManager.handleLostAll();
+
+        EasyMock.replay(taskManager, internalTopologyBuilder);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
+
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            changelogReader,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger()
+        ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        final IllegalStateException thrown = assertThrows(
+            IllegalStateException.class, thread::run);
+
+        EasyMock.verify(taskManager);
+
+        // The Mock consumer shall throw as the assignment has been wiped out, but records
are assigned.
+        assertEquals("No current assignment for partition topic1-1", thrown.getMessage());
+        assertFalse(consumer.shouldRebalance());
+    }
+
+    @Test
     public void shouldShutdownTaskManagerOnCloseWithoutStart() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);


Mime
View raw message