kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5630; Consumer should block on corrupt records and keep throwing an exception
Date Tue, 01 Aug 2017 16:04:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 37ada7957 -> 86bfbd0ce


KAFKA-5630; Consumer should block on corrupt records and keep throwing an exception

This patch handles the case that a CorruptRecordException is thrown from the iterator directly.
The fix is a little tricky as exceptions can be thrown from a few different scenarios. The
current approach is to let the same record go through the exact same process as last time
when exception is thrown, so the exception will be thrown at the same step. The only problem
for that is the iterator state will change once it throws an exception. To handle that we
cache the first iterator exception and put it into the suppressed exception of the IllegalStateException
thrown in the future.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3573 from becketqin/KAFKA-5630

(cherry picked from commit cd4c6c0183edfbf8e7e8ce526d638e31fcd09186)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86bfbd0c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86bfbd0c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86bfbd0c

Branch: refs/heads/0.11.0
Commit: 86bfbd0ce6ae968d373af135754a68de4dc0c6af
Parents: 37ada79
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Tue Aug 1 08:56:23 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue Aug 1 09:03:49 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 59 +++++++++++---------
 .../clients/consumer/internals/FetcherTest.java | 41 +++++++++++---
 2 files changed, 67 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86bfbd0c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2703823..54ccf2e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -923,7 +923,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                                         key, value, headers);
         } catch (RuntimeException e) {
             throw new SerializationException("Error deserializing key/value for partition
" + partition +
-                    " at offset " + record.offset(), e);
+                    " at offset " + record.offset() + ". If needed, please seek past the
record to continue consumption.", e);
         }
     }
 
@@ -955,7 +955,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         private CloseableIterator<Record> records;
         private long nextFetchOffset;
         private boolean isFetched = false;
-        private boolean hasExceptionInLastFetch;
+        private Exception cachedRecordException = null;
+        private boolean corruptLastRecord = false;
 
         private PartitionRecords(TopicPartition partition,
                                  CompletedFetch completedFetch,
@@ -966,13 +967,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
             this.nextFetchOffset = completedFetch.fetchedOffset;
             this.abortedProducerIds = new HashSet<>();
             this.abortedTransactions = abortedTransactions(completedFetch.partitionData);
-            this.hasExceptionInLastFetch = false;
         }
 
         private void drain() {
             if (!isFetched) {
                 maybeCloseRecordStream();
-                hasExceptionInLastFetch = false;
+                cachedRecordException = null;
                 this.isFetched = true;
                 this.completedFetch.metricAggregator.record(partition, bytesRead, recordsRead);
 
@@ -1013,15 +1013,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         }
 
         private Record nextFetchedRecord() {
-            if (hasExceptionInLastFetch) {
-                if (lastRecord == null) {
-                    maybeEnsureValid(currentBatch);
-                } else {
-                    maybeEnsureValid(lastRecord);
-                    return lastRecord;
-                }
-            }
-
             while (true) {
                 if (records == null || !records.hasNext()) {
                     maybeCloseRecordStream();
@@ -1038,7 +1029,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                         return null;
                     }
 
-                    lastRecord = null;
                     currentBatch = batches.next();
                     maybeEnsureValid(currentBatch);
 
@@ -1053,8 +1043,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                             abortedProducerIds.remove(producerId);
                         } else if (isBatchAborted(currentBatch)) {
                             log.debug("Skipping aborted record batch from partition {} with
producerId {} and " +
-                                            "offsets {} to {}",
-                                    partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
+                                          "offsets {} to {}",
+                                      partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
                             nextFetchOffset = currentBatch.nextOffset();
                             continue;
                         }
@@ -1063,7 +1053,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                     records = currentBatch.streamingIterator(decompressionBufferSupplier);
                 } else {
                     Record record = records.next();
-                    lastRecord = record;
                     // skip any records out of range
                     if (record.offset() >= nextFetchOffset) {
                         // we only do validation when the message should not be skipped.
@@ -1082,25 +1071,45 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         }
 
         private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
+            // Error when fetching the next record before deserialization.
+            if (corruptLastRecord)
+                throw new KafkaException("Received exception when fetching the next record
from " + partition
+                                             + ". If needed, please seek past the record
to "
+                                             + "continue consumption.", cachedRecordException);
+
             if (isFetched)
                 return Collections.emptyList();
 
             List<ConsumerRecord<K, V>> records = new ArrayList<>();
             try {
                 for (int i = 0; i < maxRecords; i++) {
-                    Record record = nextFetchedRecord();
-                    if (record == null)
+                    // Only move to next record if there was no exception in the last fetch.
Otherwise we should
+                    // use the last record to do deserialization again.
+                    if (cachedRecordException == null) {
+                        corruptLastRecord = true;
+                        lastRecord = nextFetchedRecord();
+                        corruptLastRecord = false;
+                    }
+                    if (lastRecord == null)
                         break;
-
-                    records.add(parseRecord(partition, currentBatch, record));
+                    records.add(parseRecord(partition, currentBatch, lastRecord));
                     recordsRead++;
-                    bytesRead += record.sizeInBytes();
-                    nextFetchOffset = record.offset() + 1;
+                    bytesRead += lastRecord.sizeInBytes();
+                    nextFetchOffset = lastRecord.offset() + 1;
+                    // In some cases, the deserialization may have thrown an exception and
the retry may succeed,
+                    // we allow user to move forward in this case.
+                    cachedRecordException = null;
                 }
+            } catch (SerializationException se) {
+                cachedRecordException = se;
+                if (records.isEmpty())
+                    throw se;
             } catch (KafkaException e) {
-                hasExceptionInLastFetch = true;
+                cachedRecordException = e;
                 if (records.isEmpty())
-                    throw e;
+                    throw new KafkaException("Received exception when fetching the next record
from " + partition
+                                                 + ". If needed, please seek past the record
to "
+                                                 + "continue consumption.", e);
             }
             return records;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86bfbd0c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0801979..315500a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -265,7 +265,7 @@ public class FetcherTest {
             public byte[] deserialize(String topic, byte[] data) {
                 if (i++ % 2 == 1) {
                     // Should be blocked on the value deserialization of the first record.
-                    assertEquals(new String(data, StandardCharsets.UTF_8), "value-1");
+                    assertEquals("value-1", new String(data, StandardCharsets.UTF_8));
                     throw new SerializationException();
                 }
                 return data;
@@ -294,7 +294,7 @@ public class FetcherTest {
     }
 
     @Test
-    public void testParseInvalidRecord() throws Exception {
+    public void testParseCorruptedRecord() throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
 
@@ -323,6 +323,15 @@ public class FetcherTest {
         out.writeInt(size);
         LegacyRecord.write(out, magic, crc, LegacyRecord.computeAttributes(magic, CompressionType.NONE,
TimestampType.CREATE_TIME), timestamp, key, value);
 
+        // Write a record whose size field is invalid.
+        out.writeLong(offset + 3);
+        out.writeInt(1);
+
+        // write one valid record
+        out.writeLong(offset + 4);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, LegacyRecord.computeAttributes(magic, CompressionType.NONE,
TimestampType.CREATE_TIME), timestamp, key, value);
+
         buffer.flip();
 
         subscriptions.assignFromUser(singleton(tp1));
@@ -337,28 +346,44 @@ public class FetcherTest {
         assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
         assertEquals(1, subscriptions.position(tp1).longValue());
 
-        // the fetchedRecords() should always throw exception due to the second invalid message
+        ensureBlockOnRecord(1L);
+        seekAndConsumeRecord(buffer, 2L);
+        ensureBlockOnRecord(3L);
+        try {
+            // For a record that cannot be retrieved from the iterator, we cannot seek over
it within the batch.
+            seekAndConsumeRecord(buffer, 4L);
+            fail("Should have thrown exception when fail to retrieve a record from iterator.");
+        } catch (KafkaException ke) {
+           // let it go
+        }
+        ensureBlockOnRecord(4L);
+    }
+
+    private void ensureBlockOnRecord(long blockedOffset) {
+        // the fetchedRecords() should always throw exception due to the invalid message
at the starting offset.
         for (int i = 0; i < 2; i++) {
             try {
                 fetcher.fetchedRecords();
                 fail("fetchedRecords should have raised KafkaException");
             } catch (KafkaException e) {
-                assertEquals(1, subscriptions.position(tp1).longValue());
+                assertEquals(blockedOffset, subscriptions.position(tp1).longValue());
             }
         }
+    }
 
+    private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) {
         // Seek to skip the bad record and fetch again.
-        subscriptions.seek(tp1, 2);
+        subscriptions.seek(tp1, toOffset);
         // Should not throw exception after the seek.
         fetcher.fetchedRecords();
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer),
Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(responseBuffer),
Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1);
         assertEquals(1, records.size());
-        assertEquals(2L, records.get(0).offset());
-        assertEquals(3, subscriptions.position(tp1).longValue());
+        assertEquals(toOffset, records.get(0).offset());
+        assertEquals(toOffset + 1, subscriptions.position(tp1).longValue());
     }
 
     @Test


Mime
View raw message