kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5078; PartitionRecords.fetchRecords(...) should defer exception to the next call if iterator has already moved across any valid record
Date Fri, 12 May 2017 03:03:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 12aa70d55 -> 911c768bc


KAFKA-5078; PartitionRecords.fetchRecords(...) should defer exception to the next call if
iterator has already moved across any valid record

Suppose there are two valid records followed by one invalid records in the FetchResponse.PartitionData().
As of current implementation, PartitionRecords.fetchRecords(...) will throw exception without
returning the two valid records. The next call to PartitionRecords.fetchRecords(...) will
not return that two valid records either because the iterator has already moved across them.

We can fix this problem by defering exception to the next call of PartitionRecords.fetchRecords(...)
if iterator has already moved across any valid record.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>,
Jiangjie Qin <becket.qin@gmail.com>

Closes #2864 from lindong28/KAFKA-5078


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/911c768b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/911c768b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/911c768b

Branch: refs/heads/trunk
Commit: 911c768bcc3ba950d63536ce4e6ba2542542d573
Parents: 12aa70d
Author: Dong Lin <lindong28@gmail.com>
Authored: Thu May 11 20:03:30 2017 -0700
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Thu May 11 20:03:30 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 35 ++++++++++++++------
 .../clients/consumer/internals/FetcherTest.java | 22 +++++++++---
 2 files changed, 43 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/911c768b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index bf5df95..66221c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -278,7 +278,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                 subscriptions.seek(tp, committed);
             }
         }
-        
+
         if (!needsOffsetReset.isEmpty()) {
             resetOffsets(needsOffsetReset);
         }
@@ -559,6 +559,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         } catch (KafkaException e) {
             if (fetched.isEmpty())
                 throw e;
+            // To be thrown in the next call of this method
             nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset,
e);
         }
         return fetched;
@@ -944,6 +945,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         private CloseableIterator<Record> records;
         private long nextFetchOffset;
         private boolean isFetched = false;
+        private KafkaException nextInlineException;
 
         private PartitionRecords(TopicPartition partition,
                                  CompletedFetch completedFetch,
@@ -954,12 +956,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
             this.nextFetchOffset = completedFetch.fetchedOffset;
             this.abortedProducerIds = new HashSet<>();
             this.abortedTransactions = abortedTransactions(completedFetch.partitionData);
+            this.nextInlineException = null;
         }
 
         private void drain() {
             if (!isFetched) {
                 maybeCloseRecordStream();
-
+                nextInlineException = null;
                 this.isFetched = true;
                 this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
 
@@ -1043,16 +1046,28 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
             if (isFetched)
                 return Collections.emptyList();
+            if (nextInlineException != null) {
+                KafkaException e = nextInlineException;
+                nextInlineException = null;
+                throw e;
+            }
 
             List<ConsumerRecord<K, V>> records = new ArrayList<>();
-            for (int i = 0; i < maxRecords; i++) {
-                Record record = nextFetchedRecord();
-                if (record == null)
-                    break;
-
-                recordsRead++;
-                bytesRead += record.sizeInBytes();
-                records.add(parseRecord(partition, currentBatch, record));
+            try {
+                for (int i = 0; i < maxRecords; i++) {
+                    Record record = nextFetchedRecord();
+                    if (record == null)
+                        break;
+
+                    recordsRead++;
+                    bytesRead += record.sizeInBytes();
+                    records.add(parseRecord(partition, currentBatch, record));
+                }
+            } catch (KafkaException e) {
+                if (records.isEmpty())
+                    throw e;
+                // To be thrown in the next call of this method
+                nextInlineException = e;
             }
             return records;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/911c768b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4b957a3..743568d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -293,10 +293,15 @@ public class FetcherTest {
         LegacyRecord.write(out, magic, crc, LegacyRecord.computeAttributes(magic, CompressionType.NONE,
TimestampType.CREATE_TIME), timestamp, key, value);
 
         // and one invalid record (note the crc)
-        out.writeLong(offset);
+        out.writeLong(offset + 1);
         out.writeInt(size);
         LegacyRecord.write(out, magic, crc + 1, LegacyRecord.computeAttributes(magic, CompressionType.NONE,
TimestampType.CREATE_TIME), timestamp, key, value);
 
+        // write one valid record
+        out.writeLong(offset + 2);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, LegacyRecord.computeAttributes(magic, CompressionType.NONE,
TimestampType.CREATE_TIME), timestamp, key, value);
+
         buffer.flip();
 
         subscriptions.assignFromUser(singleton(tp1));
@@ -306,13 +311,22 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE,
100L, 0));
         consumerClient.poll(0);
+
+        // the first fetchedRecords() should return the first valid message
+        assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
+        assertEquals(1, subscriptions.position(tp1).longValue());
+
+        // the second fetchedRecords() should throw exception due to the second invalid message
         try {
             fetcher.fetchedRecords();
-            fail("fetchedRecords should have raised");
+            fail("fetchedRecords should have raised KafkaException");
         } catch (KafkaException e) {
-            // the position should not advance since no data has been returned
-            assertEquals(0, subscriptions.position(tp1).longValue());
+            assertEquals(1, subscriptions.position(tp1).longValue());
         }
+
+        // the third fetchedRecords() should return the third valid message
+        assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
+        assertEquals(3, subscriptions.position(tp1).longValue());
     }
 
     @Test


Mime
View raw message