This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new be0f10e MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED
after poll (#5306)
be0f10e is described below
commit be0f10e1902961daa51b543f293f7038dc6e4344
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jun 28 10:19:42 2018 -0700
MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after
poll (#5306)
Before KIP-266, consumer.poll(0) would call updateAssignmentMetadataIfNeeded(Long.MAX_VALUE),
which makes sure that the rebalance is definitely completed, i.e. both onPartitionRevoked
and onPartitionAssigned called within this poll(0). After KIP-266, however, it is possible
that only onPartitionRevoked will be called if timeout is elapsed. And hence we need to double
check that state is still PARTITIONS_ASSIGNED after the consumer.poll(duration) call.
Reviewers: Ted Yu <yuzhihong@gmail.com>, Matthias J. Sax <matthias@confluent.io>
---
.../streams/processor/internals/StreamThread.java | 27 +++++++++++++---------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a159e7b..77538ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -804,21 +804,26 @@ public class StreamThread extends Thread {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
+ } else if (state == State.PARTITIONS_REVOKED) {
+ // try to fetch some records with normal poll time
+ // in order to wait long enough to get the join response
+ records = pollRequests(pollTime);
+ } else if (state == State.RUNNING) {
+ // try to fetch some records with normal poll time
+ // in order to get long polling
+ records = pollRequests(pollTime);
+ } else {
+ // any other state should not happen
+ log.error("Unexpected state {} during normal iteration", state);
+ throw new StreamsException(logPrefix + "Unexpected state " + state + " during
normal iteration");
+ }
+ // only try to initialize the assigned tasks
+ // if the state is still in PARTITION_ASSIGNED after the poll call
+ if (state == State.PARTITIONS_ASSIGNED) {
if (taskManager.updateNewAndRestoringTasks()) {
setState(State.RUNNING);
}
- } else {
- // try to fetch some records if necessary
- records = pollRequests(pollTime);
-
- // if state changed after the poll call,
- // try to initialize the assigned tasks again
- if (state == State.PARTITIONS_ASSIGNED) {
- if (taskManager.updateNewAndRestoringTasks()) {
- setState(State.RUNNING);
- }
- }
}
if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks())
{
|