kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1493; Use a well-documented LZ4 compression format and remove redundant LZ4HC option; patched by James Oliver; reviewed by Jun Rao
Date Fri, 17 Oct 2014 17:10:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 f084127a3 -> ee1bab755


kafka-1493; Use a well-documented LZ4 compression format and remove redundant LZ4HC option;
patched by James Oliver; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee1bab75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee1bab75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee1bab75

Branch: refs/heads/0.8.2
Commit: ee1bab755fdc87c3aa2c505b4abecc52ac280641
Parents: f084127
Author: James Oliver <jdo500@gmail.com>
Authored: Fri Oct 17 10:10:08 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Oct 17 10:10:08 2014 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/ProducerConfig.java  |   2 +-
 .../message/KafkaLZ4BlockInputStream.java       | 233 +++++++++++
 .../message/KafkaLZ4BlockOutputStream.java      | 387 +++++++++++++++++++
 .../kafka/common/record/CompressionType.java    |   6 +-
 .../apache/kafka/common/record/Compressor.java  |  21 +-
 .../org/apache/kafka/common/utils/Utils.java    |  60 +++
 config/producer.properties                      |   4 +-
 .../scala/kafka/message/CompressionCodec.scala  |   7 -
 .../kafka/message/CompressionFactory.scala      |  13 +-
 .../scala/kafka/tools/ConsoleProducer.scala     |   2 +-
 .../src/main/scala/kafka/tools/PerfConfig.scala |   2 +-
 .../kafka/api/ProducerCompressionTest.scala     |   5 +-
 .../kafka/message/MessageCompressionTest.scala  |  12 -
 .../scala/unit/kafka/message/MessageTest.scala  |   2 +-
 14 files changed, 698 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index bf4ed66..9095caf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -153,7 +153,7 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>compression.type</code> */
     public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
-    private static final String COMPRESSION_TYPE_DOC = "The compression type for all data
generated by the producer. The default is none (i.e. no compression). Valid " + " values are
<code>none</code>, <code>gzip</code>, <code>snappy</code>,
<code>lz4</code>, or <code>lz4hc</code>. "
+    private static final String COMPRESSION_TYPE_DOC = "The compression type for all data
generated by the producer. The default is none (i.e. no compression). Valid " + " values are
<code>none</code>, <code>gzip</code>, <code>snappy</code>,
or <code>lz4</code>. "
                                                        + "Compression is of full batches
of data, so the efficacy of batching will also impact the compression ratio (more batching
means better compression).";
 
     /** <code>metrics.sample.window.ms</code> */

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
new file mode 100644
index 0000000..5be72fe
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.message;
+
+import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
+import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * 
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4
Framing Format Spec</a>
+ */
+public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+
+  public static final String PREMATURE_EOS = "Stream ended prematurely";
+  public static final String NOT_SUPPORTED = "Stream unsupported";
+  public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
+  public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
+  
+  private final LZ4SafeDecompressor decompressor;
+  private final XXHash32 checksum;
+  private final byte[] buffer;
+  private final byte[] compressedBuffer;
+  private final int maxBlockSize;
+  private FLG flg;
+  private BD bd;
+  private int bufferOffset;
+  private int bufferSize;
+  private boolean finished;
+  
+  /**
+   * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+   * 
+   * @param in The stream to decompress
+   * @throws IOException
+   */
+  public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+    super(in);
+    decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+    checksum = XXHashFactory.fastestInstance().hash32();
+    readHeader();
+    maxBlockSize = bd.getBlockMaximumSize();
+    buffer = new byte[maxBlockSize];
+    compressedBuffer = new byte[maxBlockSize];
+    bufferOffset = 0;
+    bufferSize = 0;
+    finished = false;
+  }
+  
+  /**
+   * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+   *  
+   * @throws IOException
+   */
+  private void readHeader() throws IOException {
+    byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
+    
+    // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
+    bufferOffset = 6;
+    if (in.read(header, 0, bufferOffset) != bufferOffset) {
+      throw new IOException(PREMATURE_EOS);
+    }
+
+    if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) {
+      throw new IOException(NOT_SUPPORTED);
+    }
+    flg = FLG.fromByte(header[bufferOffset-2]);
+    bd = BD.fromByte(header[bufferOffset-1]);
+    // TODO read uncompressed content size, update flg.validate()
+    // TODO read dictionary id, update flg.validate()
+    
+    // check stream descriptor hash
+    byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
+    header[bufferOffset++] = (byte) in.read();
+    if (hash != header[bufferOffset-1]) {
+      throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+    }
+  }
+
+  /**
+   * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32
checksum, 
+   * and writes the result to a buffer.
+   * 
+   * @throws IOException
+   */
+  private void readBlock() throws IOException {
+    int blockSize = Utils.readUnsignedIntLE(in);
+
+    // Check for EndMark
+    if (blockSize == 0) {
+      finished = true;
+      // TODO implement content checksum, update flg.validate()
+      return;
+    } else if (blockSize > maxBlockSize) {
+      throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
+    }
+    
+    boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+    byte[] bufferToRead;
+    if (compressed) {
+      bufferToRead = compressedBuffer;
+    } else {
+      blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
+      bufferToRead = buffer;
+      bufferSize = blockSize;
+    }
+    
+    if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+      throw new IOException(PREMATURE_EOS);
+    }
+
+    // verify checksum
+    if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead,
0, blockSize, 0)) {
+      throw new IOException(BLOCK_HASH_MISMATCH);
+    }
+
+    if (compressed) {
+      try {
+        bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+      } catch (LZ4Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    bufferOffset = 0;
+  }
+ 
+  @Override
+  public int read() throws IOException {
+    if (finished) {
+      return -1;
+    }
+    if (available() == 0) {
+      readBlock();
+    }
+    if (finished) {
+      return -1;
+    }
+    int value = buffer[bufferOffset++] & 0xFF;
+    
+    return value;
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    net.jpountz.util.Utils.checkRange(b, off, len);
+    if (finished) {
+      return -1;
+    }
+    if (available() == 0) {
+      readBlock();
+    }
+    if (finished) {
+      return -1;
+    }
+    len = Math.min(len, available());
+    System.arraycopy(buffer, bufferOffset, b, off, len);
+    bufferOffset += len;
+    return len;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    if (finished) {
+      return 0;
+    }
+    if (available() == 0) {
+      readBlock();
+    }
+    if (finished) {
+      return 0;
+    }
+    n = Math.min(n, available());
+    bufferOffset += n;
+    return n;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return bufferSize - bufferOffset;
+  }
+
+  @Override
+  public void close() throws IOException {
+      in.close();
+  }
+
+  @Override
+  public synchronized void mark(int readlimit) {
+      throw new RuntimeException("mark not supported");
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+      throw new RuntimeException("reset not supported");
+  }
+
+  @Override
+  public boolean markSupported() {
+      return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
new file mode 100644
index 0000000..e5b9e43
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.message;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * 
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4
Framing Format Spec</a>
+ */
+public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+
+  public static final int MAGIC = 0x184D2204;
+  public static final int LZ4_MAX_HEADER_LENGTH = 19;
+  public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
+  
+  public static final String CLOSED_STREAM = "The stream is already closed";
+  
+  public static final int BLOCKSIZE_64KB = 4;
+  public static final int BLOCKSIZE_256KB = 5;
+  public static final int BLOCKSIZE_1MB = 6;
+  public static final int BLOCKSIZE_4MB = 7;
+  
+  private final LZ4Compressor compressor;
+  private final XXHash32 checksum;
+  private final FLG flg;
+  private final BD bd;
+  private final byte[] buffer;
+  private final byte[] compressedBuffer;
+  private final int maxBlockSize;
+  private int bufferOffset;
+  private boolean finished;
+
+  /**
+   * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+   *
+   * @param out The output stream to compress
+   * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb,
6=1mb, 7=4mb. All other values will generate an exception
+   * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and
appended to the stream for every block of data
+   * @throws IOException
+   */
+  public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum)
throws IOException {
+    super(out);
+    compressor = LZ4Factory.fastestInstance().fastCompressor();
+    checksum = XXHashFactory.fastestInstance().hash32();
+    bd = new BD(blockSize);
+    flg = new FLG(blockChecksum);
+    bufferOffset = 0;
+    maxBlockSize = bd.getBlockMaximumSize();
+    buffer = new byte[maxBlockSize];
+    compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
+    finished = false;
+    writeHeader();
+  }
+  
+  /**
+   * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+   *  
+   * @param out The stream to compress
+   * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb,
6=1mb, 7=4mb. All other values will generate an exception
+   * @throws IOException
+   */
+  public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
+    this(out, blockSize, false);
+  }
+  
+  /**
+   * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+   * 
+   * @param out The output stream to compress
+   * @throws IOException
+   */
+  public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
+    this(out, BLOCKSIZE_64KB);
+  }
+
+  /**
+   * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
+   *  
+   * @throws IOException
+   */
+  private void writeHeader() throws IOException {
+    Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+    bufferOffset = 4;
+    buffer[bufferOffset++] = flg.toByte();
+    buffer[bufferOffset++] = bd.toByte();
+    // TODO write uncompressed content size, update flg.validate()
+    // TODO write dictionary id, update flg.validate()
+    // compute checksum on all descriptor fields 
+    int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
+    buffer[bufferOffset++] = (byte) hash;
+    // write out frame descriptor
+    out.write(buffer, 0, bufferOffset);
+    bufferOffset = 0;
+  }
+  
+  /**
+   * Compresses buffered data, optionally computes an XXHash32 checksum, and writes
+   * the result to the underlying {@link OutputStream}.
+   * 
+   * @throws IOException
+   */
+  private void writeBlock() throws IOException {
+    if (bufferOffset == 0) {
+      return;
+    }
+    
+    int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer,
0);
+    byte[] bufferToWrite = compressedBuffer;
+    int compressMethod = 0;
+    
+    // Store block uncompressed if compressed length is greater (incompressible)
+    if (compressedLength >= bufferOffset) {
+      bufferToWrite = buffer;
+      compressedLength = bufferOffset;
+      compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
+    }
+
+    // Write content
+    Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+    out.write(bufferToWrite, 0, compressedLength);
+    
+    // Calculate and write block checksum
+    if (flg.isBlockChecksumSet()) {
+      int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
+      Utils.writeUnsignedIntLE(out, hash);
+    }
+    bufferOffset = 0;
+  }
+  
+  /**
+   * Similar to the {@link #writeBlock()} method.  Writes a 0-length block 
+   * (without block checksum) to signal the end of the block stream.
+   * 
+   * @throws IOException
+   */
+  private void writeEndMark() throws IOException {
+    Utils.writeUnsignedIntLE(out, 0);
+    // TODO implement content checksum, update flg.validate()
+    finished = true;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    ensureNotFinished();
+    if (bufferOffset == maxBlockSize) {
+      writeBlock();
+    }
+    buffer[bufferOffset++] = (byte) b;
+  }
+  
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    net.jpountz.util.Utils.checkRange(b, off, len);
+    ensureNotFinished();
+     
+    int bufferRemainingLength = maxBlockSize - bufferOffset;
+    // while b will fill the buffer
+    while (len > bufferRemainingLength) {
+      // fill remaining space in buffer 
+      System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
+      bufferOffset = maxBlockSize;
+      writeBlock();
+      // compute new offset and length
+      off += bufferRemainingLength;
+      len -= bufferRemainingLength;
+      bufferRemainingLength = maxBlockSize;
+    }
+    
+    System.arraycopy(b, off, buffer, bufferOffset, len);
+    bufferOffset += len;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (!finished) {
+        writeBlock();
+    }
+    if (out != null) {
+      out.flush();
+    }
+  }
+
+  /**
+   * A simple state check to ensure the stream is still open.
+   */
+  private void ensureNotFinished() {
+    if (finished) {
+      throw new IllegalStateException(CLOSED_STREAM);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!finished) {
+      writeEndMark();
+      flush();
+      finished = true;
+    }
+    if (out != null) {
+      out.close();
+      out = null;
+    }
+  }
+
+  public static class FLG {
+    
+    private static final int VERSION = 1;
+    
+    private final int presetDictionary;
+    private final int reserved1;
+    private final int contentChecksum;
+    private final int contentSize;
+    private final int blockChecksum;
+    private final int blockIndependence;
+    private final int version;
+    
+    public FLG() {
+      this(false);
+    }
+    
+    public FLG(boolean blockChecksum) {
+      this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+    }
+    
+    private FLG(int presetDictionary, int reserved1, int contentChecksum, 
+        int contentSize, int blockChecksum, int blockIndependence, int version) {
+      this.presetDictionary = presetDictionary;
+      this.reserved1 = reserved1;
+      this.contentChecksum = contentChecksum;
+      this.contentSize = contentSize;
+      this.blockChecksum = blockChecksum;
+      this.blockIndependence = blockIndependence;
+      this.version = version;
+      validate();
+    }
+    
+    public static FLG fromByte(byte flg) {
+      int presetDictionary =  (flg >>> 0) & 1;
+      int reserved1 =         (flg >>> 1) & 1;
+      int contentChecksum =   (flg >>> 2) & 1;
+      int contentSize =       (flg >>> 3) & 1;
+      int blockChecksum =     (flg >>> 4) & 1;
+      int blockIndependence = (flg >>> 5) & 1;
+      int version =           (flg >>> 6) & 3;
+      
+      return new FLG(presetDictionary, reserved1, contentChecksum, 
+          contentSize, blockChecksum, blockIndependence, version);
+    }
+    
+    public byte toByte() {
+      return (byte) (
+            ((presetDictionary   & 1) << 0)
+          | ((reserved1          & 1) << 1)
+          | ((contentChecksum    & 1) << 2)
+          | ((contentSize        & 1) << 3)
+          | ((blockChecksum      & 1) << 4)
+          | ((blockIndependence  & 1) << 5)
+          | ((version            & 3) << 6) );
+    }
+    
+    private void validate() {
+      if (presetDictionary != 0) {
+        throw new RuntimeException("Preset dictionary is unsupported");
+      }
+      if (reserved1 != 0) {
+        throw new RuntimeException("Reserved1 field must be 0");
+      }
+      if (contentChecksum != 0) {
+        throw new RuntimeException("Content checksum is unsupported");
+      }
+      if (contentSize != 0) {
+        throw new RuntimeException("Content size is unsupported");
+      }
+      if (blockIndependence != 1) {
+        throw new RuntimeException("Dependent block stream is unsupported");
+      }
+      if (version != VERSION) {
+        throw new RuntimeException(String.format("Version %d is unsupported", version));
+      }
+    }
+    
+    public boolean isPresetDictionarySet() {
+      return presetDictionary == 1;
+    }
+    
+    public boolean isContentChecksumSet() {
+      return contentChecksum == 1;
+    }
+    
+    public boolean isContentSizeSet() {
+      return contentSize == 1;
+    }
+    
+    public boolean isBlockChecksumSet() {
+      return blockChecksum == 1;
+    }
+    
+    public boolean isBlockIndependenceSet() {
+      return blockIndependence == 1;
+    }
+    
+    public int getVersion() {
+      return version;
+    }
+  }
+  
+  public static class BD {
+    
+    private final int reserved2;
+    private final int blockSizeValue;
+    private final int reserved3;
+    
+    public BD() {
+      this(0, BLOCKSIZE_64KB, 0);
+    }
+    
+    public BD(int blockSizeValue) {
+      this(0, blockSizeValue, 0);
+    }
+    
+    private BD(int reserved2, int blockSizeValue, int reserved3) {
+      this.reserved2 = reserved2;
+      this.blockSizeValue = blockSizeValue;
+      this.reserved3 = reserved3;
+      validate();
+    }
+    
+    public static BD fromByte(byte bd) {
+      int reserved2 =        (bd >>> 0) & 15;
+      int blockMaximumSize = (bd >>> 4) & 7;
+      int reserved3 =        (bd >>> 7) & 1;
+      
+      return new BD(reserved2, blockMaximumSize, reserved3);
+    }
+    
+    private void validate() {
+      if (reserved2 != 0) {
+        throw new RuntimeException("Reserved2 field must be 0");
+      }
+      if (blockSizeValue < 4 || blockSizeValue > 7) {
+        throw new RuntimeException("Block size value must be between 4 and 7");
+      }
+      if (reserved3 != 0) {
+        throw new RuntimeException("Reserved3 field must be 0");
+      }
+    }
+    
+    // 2^(2n+8)
+    public int getBlockMaximumSize() {
+      return (1 << ((2 * blockSizeValue) + 8));
+    }
+    
+    public byte toByte() {
+      return (byte) (
+            ((reserved2       & 15) << 0)
+          | ((blockSizeValue  & 7) << 4)
+          | ((reserved3       & 1) << 7) );
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 5227b2d..65a7e43 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,7 +20,7 @@ package org.apache.kafka.common.record;
  * The compression type to use
  */
 public enum CompressionType {
-    NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4",
0.5f), LZ4HC(4, "lz4hc", 0.5f);
+    NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4",
0.5f);
 
     public final int id;
     public final String name;
@@ -42,8 +42,6 @@ public enum CompressionType {
                 return SNAPPY;
             case 3:
                 return LZ4;
-            case 4:
-                return LZ4HC;
             default:
                 throw new IllegalArgumentException("Unknown compression type id: " + id);
         }
@@ -58,8 +56,6 @@ public enum CompressionType {
             return SNAPPY;
         else if (LZ4.name.equals(name))
             return LZ4;
-        else if (LZ4HC.name.equals(name))
-            return LZ4HC;
         else
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 0323f5f..d684e68 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -218,27 +218,13 @@ public class Compressor {
                     }
                 case LZ4:
                     try {
-                        Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
-                        OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class)
+                        Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream");
+                        OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
                             .newInstance(buffer);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
-                case LZ4HC:
-                    try {
-                        Class<?> factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory");
-                        Class<?> compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor");
-                        Class<?> lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
-                        Object factory = factoryClass.getMethod("fastestInstance").invoke(null);
-                        Object compressor = factoryClass.getMethod("highCompressor").invoke(factory);
-                        OutputStream stream = (OutputStream) lz4BlockOutputStream
-                            .getConstructor(OutputStream.class, Integer.TYPE, compressorClass)
-                            .newInstance(buffer, 1 << 16, compressor);
-                        return new DataOutputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
                 default:
                     throw new IllegalArgumentException("Unknown compression type: " + type);
             }
@@ -266,10 +252,9 @@ public class Compressor {
                         throw new KafkaException(e);
                     }
                 case LZ4:
-                case LZ4HC:
                     // dynamically load LZ4 class to avoid runtime dependency
                     try {
-                        Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream");
+                        Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream");
                         InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
                             .newInstance(buffer);
                         return new DataInputStream(stream);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a0827f5..527dd0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -12,6 +12,9 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
@@ -75,6 +78,34 @@ public class Utils {
     }
 
     /**
+     * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
+     * 
+     * @param in The stream to read from
+     * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+     */
+    public static int readUnsignedIntLE(InputStream in) throws IOException {
+        return (in.read() << 8*0) 
+             | (in.read() << 8*1)
+             | (in.read() << 8*2)
+             | (in.read() << 8*3);
+    }
+
+    /**
+     * Read an unsigned integer stored in little-endian format from a byte array
+     * at a given offset.
+     * 
+     * @param buffer The byte array to read from
+     * @param offset The position in buffer to read from
+     * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+     */
+    public static int readUnsignedIntLE(byte[] buffer, int offset) {
+        return (buffer[offset++] << 8*0)
+             | (buffer[offset++] << 8*1)
+             | (buffer[offset++] << 8*2)
+             | (buffer[offset]   << 8*3);
+    }
+
+    /**
      * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
      * 
      * @param buffer The buffer to write to
@@ -96,6 +127,35 @@ public class Utils {
     }
 
     /**
+     * Write an unsigned integer in little-endian format to the {@link OutputStream}.
+     * 
+     * @param out The stream to write to
+     * @param value The value to write
+     */
+    public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException
{
+        out.write(value >>> 8*0);
+        out.write(value >>> 8*1);
+        out.write(value >>> 8*2);
+        out.write(value >>> 8*3);
+    }
+
+    /**
+     * Write an unsigned integer in little-endian format to a byte array
+     * at a given offset.
+     * 
+     * @param buffer The byte array to write to
+     * @param offset The position in buffer to write to
+     * @param value The value to write
+     */
+    public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
+        buffer[offset++] = (byte) (value >>> 8*0);
+        buffer[offset++] = (byte) (value >>> 8*1);
+        buffer[offset++] = (byte) (value >>> 8*2);
+        buffer[offset]   = (byte) (value >>> 8*3);
+    }
+
+
+    /**
      * Get the absolute value of the given number. If the number is Int.MinValue return 0.
This is different from
      * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index 39d65d7..47ae3e2 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092
 # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
 producer.type=sync
 
-# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc.
-# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally
+# specify the compression codec for all data generated: none, gzip, snappy, lz4.
+# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
 compression.codec=none
 
 # message encoder

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index de0a0fa..9439d2b 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -24,7 +24,6 @@ object CompressionCodec {
       case GZIPCompressionCodec.codec => GZIPCompressionCodec
       case SnappyCompressionCodec.codec => SnappyCompressionCodec
       case LZ4CompressionCodec.codec => LZ4CompressionCodec
-      case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec
       case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression
codec".format(codec))
     }
   }
@@ -34,7 +33,6 @@ object CompressionCodec {
       case GZIPCompressionCodec.name => GZIPCompressionCodec
       case SnappyCompressionCodec.name => SnappyCompressionCodec
       case LZ4CompressionCodec.name => LZ4CompressionCodec
-      case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec
       case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression
codec".format(name))
     }
   }
@@ -62,11 +60,6 @@ case object LZ4CompressionCodec extends CompressionCodec {
   val name = "lz4"
 }
 
-case object LZ4HCCompressionCodec extends CompressionCodec {
-  val codec = 4
-  val name = "lz4hc"
-}
-
 case object NoCompressionCodec extends CompressionCodec {
   val codec = 0
   val name = "none"

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index 8420e13..c721040 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -22,6 +22,8 @@ import java.util.zip.GZIPOutputStream
 import java.util.zip.GZIPInputStream
 import java.io.InputStream
 
+import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
+
 object CompressionFactory {
   
   def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = {
@@ -32,11 +34,7 @@ object CompressionFactory {
         import org.xerial.snappy.SnappyOutputStream
         new SnappyOutputStream(stream)
       case LZ4CompressionCodec =>
-        import net.jpountz.lz4.LZ4BlockOutputStream
-        new LZ4BlockOutputStream(stream)
-      case LZ4HCCompressionCodec =>
-        import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory}
-        new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor())
+        new KafkaLZ4BlockOutputStream(stream)
       case _ =>
         throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
     }
@@ -49,9 +47,8 @@ object CompressionFactory {
       case SnappyCompressionCodec => 
         import org.xerial.snappy.SnappyInputStream
         new SnappyInputStream(stream)
-      case LZ4CompressionCodec | LZ4HCCompressionCodec =>
-        import net.jpountz.lz4.LZ4BlockInputStream
-        new LZ4BlockInputStream(stream)
+      case LZ4CompressionCodec =>
+        new KafkaLZ4BlockInputStream(stream)
       case _ =>
         throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index b024a69..397d80d 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -113,7 +113,7 @@ object ConsoleProducer {
       .describedAs("broker-list")
       .ofType(classOf[String])
     val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are
synchronously, one at a time as they arrive.")
-    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec:
either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." +
+    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec:
either 'none', 'gzip', 'snappy', or 'lz4'." +
                                                                   "If specified without value,
then it defaults to 'gzip'")
                                     .withOptionalArg()
                                     .describedAs("compression-codec")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/core/src/main/scala/kafka/tools/PerfConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala
index c720029..d073acf 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) {
     .defaultsTo(200)
   val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent
compressed")
     .withRequiredArg
-    .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec
as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4")
+    .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec
as 2, LZ4CompressionCodec as 3")
     .ofType(classOf[java.lang.Integer])
     .defaultsTo(0)
   val helpOpt = parser.accepts("help", "Print usage.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index c954851..6379f2b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -47,7 +47,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with
ZooK
   private val config = new KafkaConfig(props)
 
   private val topic = "topic"
-  private val numRecords = 100
+  private val numRecords = 2000
 
   @Before
   override def setUp() {
@@ -73,6 +73,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with
ZooK
     val props = new Properties()
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
+    props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
     var producer = new KafkaProducer(props)
     val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
 
@@ -125,7 +127,6 @@ object ProducerCompressionTest {
     list.add(Array("gzip"))
     list.add(Array("snappy"))
     list.add(Array("lz4"))
-    list.add(Array("lz4hc"))
     list
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 6f0addc..0bb275d 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -32,8 +32,6 @@ class MessageCompressionTest extends JUnitSuite {
       codecs += SnappyCompressionCodec
     if(isLZ4Available)
       codecs += LZ4CompressionCodec
-    if (izLZ4HCAvailable)
-      codecs += LZ4HCCompressionCodec
     for(codec <- codecs)
       testSimpleCompressDecompress(codec)
   }
@@ -74,14 +72,4 @@ class MessageCompressionTest extends JUnitSuite {
       case e: UnsatisfiedLinkError => false
     }
   }
-
-  def izLZ4HCAvailable(): Boolean = {
-    try {
-      val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1
<< 16, 
-        net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor())
-      true
-    } catch {
-      case e: UnsatisfiedLinkError => false
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1bab75/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 958c1a6..7b74a0d 100644
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite {
   def setUp(): Unit = {
     val keys = Array(null, "key".getBytes, "".getBytes)
     val vals = Array("value".getBytes, "".getBytes, null)
-    val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec,
LZ4CompressionCodec, LZ4HCCompressionCodec)
+    val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec,
LZ4CompressionCodec)
     for(k <- keys; v <- vals; codec <- codecs)
       messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
   }


Mime
View raw message