kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: MINOR: Return `false` instead of exception from `Record.isValid` if buffer is too small
Date Thu, 28 Jul 2016 11:47:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ff557f02a -> b457b0c4f


MINOR: Return `false` instead of exception from `Record.isValid` if buffer is too small

Also add tests and make `Crc32.update` perform the same argument checks as
`java.util.zip.CRC32`.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6f0d5ff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6f0d5ff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6f0d5ff

Branch: refs/heads/trunk
Commit: e6f0d5ffc1844586e53e59e4946142fcc7bc5179
Parents: ff557f0
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Jul 28 12:01:37 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jul 28 12:01:37 2016 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/Record.java  |  2 +-
 .../org/apache/kafka/common/utils/Crc32.java    |  3 ++
 .../kafka/common/record/SimpleRecordTest.java   | 55 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f0d5ff/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 77e4f68..73b314c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -256,7 +256,7 @@ public final class Record {
      * Returns true if the crc stored with the record matches the crc computed off the record
contents
      */
     public boolean isValid() {
-        return checksum() == computeChecksum();
+        return size() >= CRC_LENGTH && checksum() == computeChecksum();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f0d5ff/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
index caa0058..48af070 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java
@@ -72,6 +72,9 @@ public class Crc32 implements Checksum {
 
     @Override
     public void update(byte[] b, int off, int len) {
+        if (off < 0 || len < 0 || off > b.length - len)
+            throw new ArrayIndexOutOfBoundsException();
+
         int localCrc = crc;
 
         while (len > 7) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e6f0d5ff/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
new file mode 100644
index 0000000..d7551d8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SimpleRecordTest {
+
+    /* This scenario can happen if the record size field is corrupt and we end up allocating
a buffer that is too small */
+    @Test
+    public void testIsValidWithTooSmallBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(2);
+        Record record = new Record(buffer);
+        assertFalse(record.isValid());
+    }
+
+    @Test
+    public void testIsValidWithChecksumMismatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        // set checksum
+        buffer.putInt(2);
+        Record record = new Record(buffer);
+        assertFalse(record.isValid());
+    }
+
+    @Test
+    public void testIsValidWithFourBytesBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        Record record = new Record(buffer);
+        // It is a bit weird that we return `true` in this case, we could extend the definition
of `isValid` to the
+        // something like the following to detect a clearly corrupt record:
+        // return checksum() == computeChecksum() && size() >= recordSize(0, 0);
+        assertTrue(record.isValid());
+    }
+
+}


Mime
View raw message