kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after poll (#5306)
Date Thu, 28 Jun 2018 17:20:26 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 495e385  MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED
after poll (#5306)
495e385 is described below

commit 495e3857f77236555ae897da1dd0a63f14861bd3
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jun 28 10:19:42 2018 -0700

    MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after
poll (#5306)
    
    Before KIP-266, consumer.poll(0) would call updateAssignmentMetadataIfNeeded(Long.MAX_VALUE),
which makes sure that the rebalance is definitely completed, i.e. both onPartitionRevoked
and onPartitionAssigned called within this poll(0). After KIP-266, however, it is possible
that only onPartitionRevoked will be called if timeout is elapsed. And hence we need to double
check that state is still PARTITIONS_ASSIGNED after the consumer.poll(duration) call.
    
    Reviewers: Ted Yu <yuzhihong@gmail.com>, Matthias J. Sax <matthias@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  | 27 +++++++++++++---------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a159e7b..77538ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -804,21 +804,26 @@ public class StreamThread extends Thread {
             // try to fetch some records with zero poll millis
             // to unblock the restoration as soon as possible
             records = pollRequests(Duration.ZERO);
+        } else if (state == State.PARTITIONS_REVOKED) {
+            // try to fetch some records with normal poll time
+            // in order to wait long enough to get the join response
+            records = pollRequests(pollTime);
+        } else if (state == State.RUNNING) {
+            // try to fetch some records with normal poll time
+            // in order to get long polling
+            records = pollRequests(pollTime);
+        } else {
+            // any other state should not happen
+            log.error("Unexpected state {} during normal iteration", state);
+            throw new StreamsException(logPrefix + "Unexpected state " + state + " during
normal iteration");
+        }
 
+        // only try to initialize the assigned tasks
+        // if the state is still in PARTITION_ASSIGNED after the poll call
+        if (state == State.PARTITIONS_ASSIGNED) {
             if (taskManager.updateNewAndRestoringTasks()) {
                 setState(State.RUNNING);
             }
-        } else {
-            // try to fetch some records if necessary
-            records = pollRequests(pollTime);
-
-            // if state changed after the poll call,
-            // try to initialize the assigned tasks again
-            if (state == State.PARTITIONS_ASSIGNED) {
-                if (taskManager.updateNewAndRestoringTasks()) {
-                    setState(State.RUNNING);
-                }
-            }
         }
 
         if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks())
{


Mime
View raw message