kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5097; Fix regression in consumer caused by unsafe access to potentially unassigned partitions
Date Thu, 20 Apr 2017 20:43:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 a26731674 -> 2aa615545


KAFKA-5097; Fix regression in consumer caused by unsafe access to potentially unassigned partitions

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jiangjie Qin <becket.qin@gmail.com>,
Jason Gustafson <jason@confluent.io>

Closes #2876 from enothereska/KAFKA-4755-fetcher-only


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2aa61554
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2aa61554
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2aa61554

Branch: refs/heads/0.10.2
Commit: 2aa615545514cd9d2be0c998e685fad10927d208
Parents: a267316
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Apr 20 13:10:00 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Apr 20 13:10:00 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 61 +++++++++-----------
 1 file changed, 27 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2aa61554/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c8bbfa3..e2631b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -472,46 +472,39 @@ public class Fetcher<K, V> implements SubscriptionState.Listener
{
 
         Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
         int recordsRemaining = maxPollRecords;
-        TopicPartition fetchedPartition = null;
-        long fetchedOffsets = -1L;
-
-        try {
-            while (recordsRemaining > 0) {
-                if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
-                    CompletedFetch completedFetch = completedFetches.poll();
-                    if (completedFetch == null) break;
-
-                    fetchedPartition = completedFetch.partition;
-                    fetchedOffsets = completedFetch.fetchedOffset;
+        while (recordsRemaining > 0) {
+            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
+                CompletedFetch completedFetch = completedFetches.poll();
+                if (completedFetch == null) break;
+                try {
                     nextInLineRecords = parseCompletedFetch(completedFetch);
-                } else {
-                    TopicPartition partition = nextInLineRecords.partition;
-                    fetchedPartition = partition;
-                    fetchedOffsets = subscriptions.position(partition);
-                    List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords,
recordsRemaining);
-                    if (!records.isEmpty()) {
-                        List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
-                        if (currentRecords == null) {
-                            drained.put(partition, records);
-                        } else {
-                            // this case shouldn't usually happen because we only send one
fetch at a time per partition,
-                            // but it might conceivably happen in some rare cases (such as
partition leader changes).
-                            // we have to copy to a new list because the old one may be immutable
-                            List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size()
+ currentRecords.size());
-                            newRecords.addAll(currentRecords);
-                            newRecords.addAll(records);
-                            drained.put(partition, newRecords);
-                        }
-                        recordsRemaining -= records.size();
+                } catch (KafkaException e) {
+                    if (drained.isEmpty())
+                        throw e;
+                    nextInLineExceptionMetadata = new ExceptionMetadata(completedFetch.partition,
completedFetch.fetchedOffset, e);
+                }
+            } else {
+                TopicPartition partition = nextInLineRecords.partition;
+                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords,
recordsRemaining);
+                if (!records.isEmpty()) {
+                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
+                    if (currentRecords == null) {
+                        drained.put(partition, records);
+                    } else {
+                        // this case shouldn't usually happen because we only send one fetch
at a time per partition,
+                        // but it might conceivably happen in some rare cases (such as partition
leader changes).
+                        // we have to copy to a new list because the old one may be immutable
+                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size()
+ currentRecords.size());
+                        newRecords.addAll(currentRecords);
+                        newRecords.addAll(records);
+                        drained.put(partition, newRecords);
                     }
+                    recordsRemaining -= records.size();
                 }
             }
-        } catch (KafkaException e) {
-            if (drained.isEmpty())
-                throw e;
-            nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffsets,
e);
         }
 
+
         return drained;
     }
 


Mime
View raw message