kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: MINOR: More graceful handling of buffers that are too small in Record's `isValid` and `ensureValid`
Date Wed, 07 Sep 2016 00:34:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 286411cbb -> d2acd676c


MINOR: More graceful handling of buffers that are too small in Record's `isValid` and `ensureValid`

Also add tests and make `Crc32.update` perform the same argument checks as
`java.util.zip.CRC32`.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1672 from ijuma/record-is-valid-should-be-more-robust


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2acd676
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2acd676
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2acd676

Branch: refs/heads/trunk
Commit: d2acd676c3eb0c11d0042bc3b9ae314165c68443
Parents: 286411c
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Sep 6 17:34:46 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Sep 6 17:34:46 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 15 +++--
 .../org/apache/kafka/common/record/Record.java  | 15 +++--
 .../org/apache/kafka/common/utils/Crc32.java    |  3 +
 .../kafka/common/record/SimpleRecordTest.java   | 66 ++++++++++++++++++++
 4 files changed, 87 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index aa5cdbe..eb876a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
@@ -617,12 +618,14 @@ public class Fetcher<K, V> {
     private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry)
{
         Record record = logEntry.record();
 
-        if (this.checkCrcs && !record.isValid())
-            throw new KafkaException("Record for partition " + partition + " at offset "
-                    + logEntry.offset() + " is corrupt (stored crc = " + record.checksum()
-                    + ", computed crc = "
-                    + record.computeChecksum()
-                    + ")");
+        if (this.checkCrcs) {
+            try {
+                record.ensureValid();
+            } catch (InvalidRecordException e) {
+                throw new KafkaException("Record for partition " + partition + " at offset
" + logEntry.offset()
+                        + " is invalid, cause: " + e.getMessage());
+            }
+        }
 
         try {
             long offset = logEntry.offset();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 77e4f68..09cb80d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -256,18 +256,21 @@ public final class Record {
      * Returns true if the crc stored with the record matches the crc computed off the record
contents
      */
     public boolean isValid() {
-        return checksum() == computeChecksum();
+        return size() >= CRC_LENGTH && checksum() == computeChecksum();
     }
 
     /**
      * Throw an InvalidRecordException if isValid is false for this record
      */
     public void ensureValid() {
-        if (!isValid())
-            throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
-                                             + ", computed crc = "
-                                             + computeChecksum()
-                                             + ")");
+        if (!isValid()) {
+            if (size() < CRC_LENGTH)
+                throw new InvalidRecordException("Record is corrupt (crc could not be retrieved
as the record is too "
+                        + "small, size = " + size() + ")");
+            else
+                throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+                        + ", computed crc = " + computeChecksum() + ")");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
index caa0058..48af070 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
@@ -72,6 +72,9 @@ public class Crc32 implements Checksum {
 
     @Override
     public void update(byte[] b, int off, int len) {
+        if (off < 0 || len < 0 || off > b.length - len)
+            throw new ArrayIndexOutOfBoundsException();
+
         int localCrc = crc;
 
         while (len > 7) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
new file mode 100644
index 0000000..aabadfe
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SimpleRecordTest {
+
+    /* This scenario can happen if the record size field is corrupt and we end up allocating
a buffer that is too small */
+    @Test
+    public void testIsValidWithTooSmallBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(2);
+        Record record = new Record(buffer);
+        assertFalse(record.isValid());
+        try {
+            record.ensureValid();
+            fail("InvalidRecordException should have been thrown");
+        } catch (InvalidRecordException e) { }
+    }
+
+    @Test
+    public void testIsValidWithChecksumMismatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        // set checksum
+        buffer.putInt(2);
+        Record record = new Record(buffer);
+        assertFalse(record.isValid());
+        try {
+            record.ensureValid();
+            fail("InvalidRecordException should have been thrown");
+        } catch (InvalidRecordException e) { }
+    }
+
+    @Test
+    public void testIsValidWithFourBytesBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        Record record = new Record(buffer);
+        // it is a bit weird that we return `true` in this case, we could extend the definition
of `isValid` to
+        // something like the following to detect a clearly corrupt record:
+        // return size() >= recordSize(0, 0) && checksum() == computeChecksum();
+        assertTrue(record.isValid());
+        // no exception should be thrown
+        record.ensureValid();
+    }
+
+}


Mime
View raw message