kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4549; Call writeBlock before writeEndMark in KafkaLZ4BlockOutputStream.close()
Date Wed, 28 Dec 2016 12:51:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8e1395b17 -> 88a439ed8


KAFKA-4549; Call writeBlock before writeEndMark in KafkaLZ4BlockOutputStream.close()

Author: MURAKAMI Masahiko <fossamagna2@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2265 from fossamagna/fix-lz4outputstream-close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/88a439ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/88a439ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/88a439ed

Branch: refs/heads/trunk
Commit: 88a439ed8e78803ae47783c3e5a75d4e4cad2a77
Parents: 8e1395b
Author: MURAKAMI Masahiko <fossamagna2@gmail.com>
Authored: Wed Dec 28 12:36:45 2016 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Dec 28 12:49:00 2016 +0000

----------------------------------------------------------------------
 .../record/KafkaLZ4BlockOutputStream.java       |  1 +
 .../kafka/common/record/KafkaLZ4Test.java       | 40 ++++++++++++++------
 2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/88a439ed/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 933b2cf..515c09a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -259,6 +259,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
     @Override
     public void close() throws IOException {
         if (!finished) {
+            writeBlock();
             writeEndMark();
             flush();
             finished = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/88a439ed/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
index 37877ef..47aebcb 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -41,11 +41,13 @@ public class KafkaLZ4Test {
     private final boolean useBrokenFlagDescriptorChecksum;
     private final boolean ignoreFlagDescriptorChecksum;
     private final byte[] payload;
+    private final boolean close;
 
-    public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum,
byte[] payload) {
+    public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum,
byte[] payload, boolean close) {
         this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
         this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
         this.payload = payload;
+        this.close = close;
     }
 
     @Parameters
@@ -55,7 +57,8 @@ public class KafkaLZ4Test {
         List<Object[]> values = new ArrayList<Object[]>();
         for (boolean broken : Arrays.asList(false, true))
             for (boolean ignore : Arrays.asList(false, true))
-                values.add(new Object[] {broken, ignore, payload});
+                for (boolean close : Arrays.asList(false, true))
+                    values.add(new Object[] {broken, ignore, payload, close});
         return values;
     }
 
@@ -64,26 +67,30 @@ public class KafkaLZ4Test {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(output, this.useBrokenFlagDescriptorChecksum);
         lz4.write(this.payload, 0, this.payload.length);
-        lz4.flush();
+        if (this.close) {
+            lz4.close();
+        } else {
+            lz4.flush();
+        }
         byte[] compressed = output.toByteArray();
 
         // Check magic bytes stored as little-endian
         int offset = 0;
-        assertEquals(compressed[offset++], 0x04);
-        assertEquals(compressed[offset++], 0x22);
-        assertEquals(compressed[offset++], 0x4D);
-        assertEquals(compressed[offset++], 0x18);
+        assertEquals(0x04, compressed[offset++]);
+        assertEquals(0x22, compressed[offset++]);
+        assertEquals(0x4D, compressed[offset++]);
+        assertEquals(0x18, compressed[offset++]);
 
         // Check flg descriptor
         byte flg = compressed[offset++];
 
         // 2-bit version must be 01
         int version = (flg >>> 6) & 3;
-        assertEquals(version, 1);
+        assertEquals(1, version);
 
         // Reserved bits should always be 0
         int reserved = flg & 3;
-        assertEquals(reserved, 0);
+        assertEquals(0, reserved);
 
         // Check block descriptor
         byte bd = compressed[offset++];
@@ -96,9 +103,9 @@ public class KafkaLZ4Test {
 
         // Multiple reserved bit ranges in block descriptor
         reserved = bd & 15;
-        assertEquals(reserved, 0);
+        assertEquals(0, reserved);
         reserved = (bd >>> 7) & 1;
-        assertEquals(reserved, 0);
+        assertEquals(0, reserved);
 
         // If flg descriptor sets content size flag
         // there are 8 additional bytes before checksum
@@ -121,7 +128,16 @@ public class KafkaLZ4Test {
         int hash = XXHashFactory.fastestInstance().hash32().hash(compressed, off, len, 0);
 
         byte hc = compressed[offset++];
-        assertEquals(hc, (byte) ((hash >> 8) & 0xFF));
+        assertEquals((byte) ((hash >> 8) & 0xFF), hc);
+
+        // Check EndMark, data block with size `0` expressed as a 32-bits value
+        if (this.close) {
+            offset = compressed.length - 4;
+            assertEquals(0, compressed[offset++]);
+            assertEquals(0, compressed[offset++]);
+            assertEquals(0, compressed[offset++]);
+            assertEquals(0, compressed[offset++]);
+        }
 
         ByteArrayInputStream input = new ByteArrayInputStream(compressed);
         try {


Mime
View raw message