kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Better messaging for invalid fetch response (#6427)
Date Tue, 12 Mar 2019 22:24:42 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd79dd0  MINOR: Better messaging for invalid fetch response (#6427)
fd79dd0 is described below

commit fd79dd060812af0531428cb2f7fd4aa74ea8124a
Author: José Armando García Sancio <jsancio@users.noreply.github.com>
AuthorDate: Tue Mar 12 15:24:24 2019 -0700

    MINOR: Better messaging for invalid fetch response (#6427)
    
    Users have reported (KAFKA-7565) that when consumer poll wake up is used,
    it is possible to receive fetch responses that don't match the copied topic
    partitions collection for the session when the fetch request was created.
    
    This commit improves the error handling here by throwing an
    IllegalStateException instead of a NullPointerException. And by
    generating a message for the exception that includes a bit of more
    information.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 32 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 8ac5730..64bc921 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
@@ -241,13 +242,30 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
 
                                 for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>>
entry : response.responseData().entrySet()) {
                                     TopicPartition partition = entry.getKey();
-                                    long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
-                                    FetchResponse.PartitionData<Records> fetchData
= entry.getValue();
-
-                                    log.debug("Fetch {} at offset {} for partition {} returned
fetch data {}",
-                                            isolationLevel, fetchOffset, partition, fetchData);
-                                    completedFetches.add(new CompletedFetch(partition, fetchOffset,
fetchData, metricAggregator,
-                                            resp.requestHeader().apiVersion()));
+                                    FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
+                                    if (requestData == null) {
+                                        String message;
+                                        if (data.metadata().isFull()) {
+                                            message = MessageFormatter.arrayFormat(
+                                                    "Response for missing full request partition:
partition={}; metadata={}",
+                                                    new Object[]{partition, data.metadata()}).getMessage();
+                                        } else {
+                                            message = MessageFormatter.arrayFormat(
+                                                    "Response for missing session request
partition: partition={}; metadata={}; toSend={}; toForget={}",
+                                                    new Object[]{partition, data.metadata(),
data.toSend(), data.toForget()}).getMessage();
+                                        }
+
+                                        // Received fetch response for missing session partition
+                                        throw new IllegalStateException(message);
+                                    } else {
+                                        long fetchOffset = requestData.fetchOffset;
+                                        FetchResponse.PartitionData<Records> fetchData
= entry.getValue();
+
+                                        log.debug("Fetch {} at offset {} for partition {}
returned fetch data {}",
+                                                isolationLevel, fetchOffset, partition, fetchData);
+                                        completedFetches.add(new CompletedFetch(partition,
fetchOffset, fetchData, metricAggregator,
+                                                    resp.requestHeader().apiVersion()));
+                                    }
                                 }
 
                                 sensors.fetchLatency.record(resp.requestLatencyMs());


Mime
View raw message