kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2421: Upgrade LZ4 to version 1.3
Date Tue, 01 Dec 2015 19:57:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0a52ddfd0 -> 69269e76a


KAFKA-2421: Upgrade LZ4 to version 1.3

A few notes on the added test:
 * I verified this test fails when changing between snappy 1.1.1.2 and 1.1.1.7 (per KAFKA-2189)
 * The hard coded numbers are passing before and after lzo change

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #552 from granthenke/lz4


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69269e76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69269e76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69269e76

Branch: refs/heads/trunk
Commit: 69269e76a43adf85a478240280c6ab3c7eef4d8e
Parents: 0a52ddf
Author: Grant Henke <granthenke@gmail.com>
Authored: Tue Dec 1 11:57:34 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 1 11:57:34 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  2 +-
 .../common/record/KafkaLZ4BlockInputStream.java | 12 +++----
 .../record/KafkaLZ4BlockOutputStream.java       | 18 +++++-----
 .../kafka/message/MessageCompressionTest.scala  | 37 +++++++++++++++-----
 4 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69269e76/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 9dde14f..e24279e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -408,7 +408,7 @@ project(':clients') {
   dependencies {
     compile "$slf4japi"
     compile 'org.xerial.snappy:snappy-java:1.1.2'
-    compile 'net.jpountz.lz4:lz4:1.2.0'
+    compile 'net.jpountz.lz4:lz4:1.3'
 
     testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
     testCompile "$junit"

http://git-wip-us.apache.org/repos/asf/kafka/blob/69269e76/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index f480da2..372d4f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -37,7 +37,7 @@ import net.jpountz.xxhash.XXHashFactory;
 
 /**
  * A partial implementation of the v1.4.1 LZ4 Frame format.
- * 
+ *
  * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4
Framing
  *      Format Spec</a>
  */
@@ -61,7 +61,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
 
     /**
      * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
-     * 
+     *
      * @param in The stream to decompress
      * @throws IOException
      */
@@ -80,7 +80,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
 
     /**
      * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
-     * 
+     *
      * @throws IOException
      */
     private void readHeader() throws IOException {
@@ -111,7 +111,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream
{
     /**
      * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32
checksum, and writes the
      * result to a buffer.
-     * 
+     *
      * @throws IOException
      */
     private void readBlock() throws IOException {
@@ -174,7 +174,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream
{
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
-        net.jpountz.util.Utils.checkRange(b, off, len);
+        net.jpountz.util.SafeUtils.checkRange(b, off, len);
         if (finished) {
             return -1;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69269e76/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 6a2231f..7d23f4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -30,7 +30,7 @@ import net.jpountz.xxhash.XXHashFactory;
 
 /**
  * A partial implementation of the v1.4.1 LZ4 Frame format.
- * 
+ *
  * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4
Framing
  *      Format Spec</a>
  */
@@ -59,7 +59,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
 
     /**
      * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
-     * 
+     *
      * @param out The output stream to compress
      * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb,
6=1mb, 7=4mb. All other
      *            values will generate an exception
@@ -83,7 +83,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
 
     /**
      * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
-     * 
+     *
      * @param out The stream to compress
      * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb,
6=1mb, 7=4mb. All other
      *            values will generate an exception
@@ -95,7 +95,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
 
     /**
      * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
-     * 
+     *
      * @param out The output stream to compress
      * @throws IOException
      */
@@ -105,7 +105,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
 
     /**
      * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
-     * 
+     *
      * @throws IOException
      */
     private void writeHeader() throws IOException {
@@ -126,7 +126,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
     /**
      * Compresses buffered data, optionally computes an XXHash32 checksum, and writes the
result to the underlying
      * {@link OutputStream}.
-     * 
+     *
      * @throws IOException
      */
     private void writeBlock() throws IOException {
@@ -160,7 +160,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
     /**
      * Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block
checksum) to signal the end
      * of the block stream.
-     * 
+     *
      * @throws IOException
      */
     private void writeEndMark() throws IOException {
@@ -180,7 +180,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream
{
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-        net.jpountz.util.Utils.checkRange(b, off, len);
+        net.jpountz.util.SafeUtils.checkRange(b, off, len);
         ensureNotFinished();
 
         int bufferRemainingLength = maxBlockSize - bufferOffset;

http://git-wip-us.apache.org/repos/asf/kafka/blob/69269e76/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index f45bead..d8613f7 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -24,7 +24,7 @@ import org.junit._
 import org.junit.Assert._
 
 class MessageCompressionTest extends JUnitSuite {
-  
+
   @Test
   def testSimpleCompressDecompress() {
     val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)
@@ -35,19 +35,40 @@ class MessageCompressionTest extends JUnitSuite {
     for(codec <- codecs)
       testSimpleCompressDecompress(codec)
   }
-  
+
+  //  A quick test to ensure any growth or increase in compression size is known when upgrading
libraries
+  @Test
+  def testCompressSize() {
+    val bytes1k: Array[Byte] = (0 until 1000).map(_.toByte).toArray
+    val bytes2k: Array[Byte] = (1000 until 2000).map(_.toByte).toArray
+    val bytes3k: Array[Byte] = (3000 until 4000).map(_.toByte).toArray
+    val messages: List[Message] = List(new Message(bytes1k), new Message(bytes2k), new Message(bytes3k))
+
+    testCompressSize(GZIPCompressionCodec, messages, 388)
+
+    if(isSnappyAvailable)
+      testCompressSize(SnappyCompressionCodec, messages, 491)
+
+    if(isLZ4Available)
+      testCompressSize(LZ4CompressionCodec, messages, 380)
+  }
+
   def testSimpleCompressDecompress(compressionCodec: CompressionCodec) {
     val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes),
new Message("I am not so well today".getBytes))
     val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages
= messages:_*)
-    assertEquals(compressionCodec, messageSet.shallowIterator.next.message.compressionCodec)
+    assertEquals(compressionCodec, messageSet.shallowIterator.next().message.compressionCodec)
     val decompressed = messageSet.iterator.map(_.message).toList
     assertEquals(messages, decompressed)
   }
 
-  
-  def isSnappyAvailable(): Boolean = {
+  def testCompressSize(compressionCodec: CompressionCodec, messages: List[Message], expectedSize:
Int) {
+    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages
= messages:_*)
+    assertEquals(s"$compressionCodec size has changed.", expectedSize, messageSet.sizeInBytes)
+  }
+
+  def isSnappyAvailable: Boolean = {
     try {
-      val snappy = new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
+      new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
       true
     } catch {
       case e: UnsatisfiedLinkError => false
@@ -55,9 +76,9 @@ class MessageCompressionTest extends JUnitSuite {
     }
   }
 
-  def isLZ4Available(): Boolean = {
+  def isLZ4Available: Boolean = {
     try {
-      val lz4 = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
+      new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
       true
     } catch {
       case e: UnsatisfiedLinkError => false


Mime
View raw message