Repository: kafka
Updated Branches:
refs/heads/trunk 4271ecbf0 -> 37356bfee
kafka-1493; Use a well-documented LZ4 compression format and remove redundant LZ4HC option;
patched by James Oliver; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37356bfe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37356bfe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37356bfe
Branch: refs/heads/trunk
Commit: 37356bfee09bf0dffc93d85270c342e03c36ca44
Parents: 4271ecb
Author: James Oliver <jdo500@gmail.com>
Authored: Fri Oct 17 10:07:34 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Oct 17 10:07:34 2014 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/ProducerConfig.java | 2 +-
.../message/KafkaLZ4BlockInputStream.java | 233 +++++++++++
.../message/KafkaLZ4BlockOutputStream.java | 387 +++++++++++++++++++
.../kafka/common/record/CompressionType.java | 6 +-
.../apache/kafka/common/record/Compressor.java | 21 +-
.../org/apache/kafka/common/utils/Utils.java | 60 +++
config/producer.properties | 4 +-
.../scala/kafka/message/CompressionCodec.scala | 7 -
.../kafka/message/CompressionFactory.scala | 13 +-
.../scala/kafka/tools/ConsoleProducer.scala | 2 +-
.../src/main/scala/kafka/tools/PerfConfig.scala | 2 +-
.../kafka/api/ProducerCompressionTest.scala | 5 +-
.../kafka/message/MessageCompressionTest.scala | 12 -
.../scala/unit/kafka/message/MessageTest.scala | 2 +-
14 files changed, 698 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index bf4ed66..9095caf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -153,7 +153,7 @@ public class ProducerConfig extends AbstractConfig {
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
- private static final String COMPRESSION_TYPE_DOC = "The compression type for all data
generated by the producer. The default is none (i.e. no compression). Valid " + " values are
<code>none</code>, <code>gzip</code>, <code>snappy</code>,
<code>lz4</code>, or <code>lz4hc</code>. "
+ private static final String COMPRESSION_TYPE_DOC = "The compression type for all data
generated by the producer. The default is none (i.e. no compression). Valid " + " values are
<code>none</code>, <code>gzip</code>, <code>snappy</code>,
or <code>lz4</code>. "
+ "Compression is of full batches
of data, so the efficacy of batching will also impact the compression ratio (more batching
means better compression).";
/** <code>metrics.sample.window.ms</code> */
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
new file mode 100644
index 0000000..5be72fe
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.message;
+
+import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
+import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ *
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4
Framing Format Spec</a>
+ */
+public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+
+ public static final String PREMATURE_EOS = "Stream ended prematurely";
+ public static final String NOT_SUPPORTED = "Stream unsupported";
+ public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
+ public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
+
+ private final LZ4SafeDecompressor decompressor;
+ private final XXHash32 checksum;
+ private final byte[] buffer;
+ private final byte[] compressedBuffer;
+ private final int maxBlockSize;
+ private FLG flg;
+ private BD bd;
+ private int bufferOffset;
+ private int bufferSize;
+ private boolean finished;
+
+ /**
+ * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+ *
+ * @param in The stream to decompress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+ super(in);
+ decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ readHeader();
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[maxBlockSize];
+ bufferOffset = 0;
+ bufferSize = 0;
+ finished = false;
+ }
+
+ /**
+ * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+ *
+ * @throws IOException
+ */
+ private void readHeader() throws IOException {
+ byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
+
+ // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
+ bufferOffset = 6;
+ if (in.read(header, 0, bufferOffset) != bufferOffset) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) {
+ throw new IOException(NOT_SUPPORTED);
+ }
+ flg = FLG.fromByte(header[bufferOffset-2]);
+ bd = BD.fromByte(header[bufferOffset-1]);
+ // TODO read uncompressed content size, update flg.validate()
+ // TODO read dictionary id, update flg.validate()
+
+ // check stream descriptor hash
+ byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
+ header[bufferOffset++] = (byte) in.read();
+ if (hash != header[bufferOffset-1]) {
+ throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+ }
+ }
+
+ /**
+ * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32
checksum,
+ * and writes the result to a buffer.
+ *
+ * @throws IOException
+ */
+ private void readBlock() throws IOException {
+ int blockSize = Utils.readUnsignedIntLE(in);
+
+ // Check for EndMark
+ if (blockSize == 0) {
+ finished = true;
+ // TODO implement content checksum, update flg.validate()
+ return;
+ } else if (blockSize > maxBlockSize) {
+ throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
+ }
+
+ boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+ byte[] bufferToRead;
+ if (compressed) {
+ bufferToRead = compressedBuffer;
+ } else {
+ blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ bufferToRead = buffer;
+ bufferSize = blockSize;
+ }
+
+ if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ // verify checksum
+ if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead,
0, blockSize, 0)) {
+ throw new IOException(BLOCK_HASH_MISMATCH);
+ }
+
+ if (compressed) {
+ try {
+ bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+ } catch (LZ4Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ bufferOffset = 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+ int value = buffer[bufferOffset++] & 0xFF;
+
+ return value;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ net.jpountz.util.Utils.checkRange(b, off, len);
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+ len = Math.min(len, available());
+ System.arraycopy(buffer, bufferOffset, b, off, len);
+ bufferOffset += len;
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (finished) {
+ return 0;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return 0;
+ }
+ n = Math.min(n, available());
+ bufferOffset += n;
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return bufferSize - bufferOffset;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new RuntimeException("mark not supported");
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new RuntimeException("reset not supported");
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
new file mode 100644
index 0000000..e5b9e43
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.message;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ *
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4
Framing Format Spec</a>
+ */
+public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+
+ public static final int MAGIC = 0x184D2204;
+ public static final int LZ4_MAX_HEADER_LENGTH = 19;
+ public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
+
+ public static final String CLOSED_STREAM = "The stream is already closed";
+
+ public static final int BLOCKSIZE_64KB = 4;
+ public static final int BLOCKSIZE_256KB = 5;
+ public static final int BLOCKSIZE_1MB = 6;
+ public static final int BLOCKSIZE_4MB = 7;
+
+ private final LZ4Compressor compressor;
+ private final XXHash32 checksum;
+ private final FLG flg;
+ private final BD bd;
+ private final byte[] buffer;
+ private final byte[] compressedBuffer;
+ private final int maxBlockSize;
+ private int bufferOffset;
+ private boolean finished;
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb,
6=1mb, 7=4mb. All other values will generate an exception
+ * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and
appended to the stream for every block of data
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum)
throws IOException {
+ super(out);
+ compressor = LZ4Factory.fastestInstance().fastCompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ bd = new BD(blockSize);
+ flg = new FLG(blockChecksum);
+ bufferOffset = 0;
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
+ finished = false;
+ writeHeader();
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb,
6=1mb, 7=4mb. All other values will generate an exception
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
+ this(out, blockSize, false);
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
+ this(out, BLOCKSIZE_64KB);
+ }
+
+ /**
+ * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeHeader() throws IOException {
+ Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+ bufferOffset = 4;
+ buffer[bufferOffset++] = flg.toByte();
+ buffer[bufferOffset++] = bd.toByte();
+ // TODO write uncompressed content size, update flg.validate()
+ // TODO write dictionary id, update flg.validate()
+ // compute checksum on all descriptor fields
+ int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
+ buffer[bufferOffset++] = (byte) hash;
+ // write out frame descriptor
+ out.write(buffer, 0, bufferOffset);
+ bufferOffset = 0;
+ }
+
+ /**
+ * Compresses buffered data, optionally computes an XXHash32 checksum, and writes
+ * the result to the underlying {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeBlock() throws IOException {
+ if (bufferOffset == 0) {
+ return;
+ }
+
+ int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer,
0);
+ byte[] bufferToWrite = compressedBuffer;
+ int compressMethod = 0;
+
+ // Store block uncompressed if compressed length is greater (incompressible)
+ if (compressedLength >= bufferOffset) {
+ bufferToWrite = buffer;
+ compressedLength = bufferOffset;
+ compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ }
+
+ // Write content
+ Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+ out.write(bufferToWrite, 0, compressedLength);
+
+ // Calculate and write block checksum
+ if (flg.isBlockChecksumSet()) {
+ int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
+ Utils.writeUnsignedIntLE(out, hash);
+ }
+ bufferOffset = 0;
+ }
+
+ /**
+ * Similar to the {@link #writeBlock()} method. Writes a 0-length block
+ * (without block checksum) to signal the end of the block stream.
+ *
+ * @throws IOException
+ */
+ private void writeEndMark() throws IOException {
+ Utils.writeUnsignedIntLE(out, 0);
+ // TODO implement content checksum, update flg.validate()
+ finished = true;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureNotFinished();
+ if (bufferOffset == maxBlockSize) {
+ writeBlock();
+ }
+ buffer[bufferOffset++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ net.jpountz.util.Utils.checkRange(b, off, len);
+ ensureNotFinished();
+
+ int bufferRemainingLength = maxBlockSize - bufferOffset;
+ // while b will fill the buffer
+ while (len > bufferRemainingLength) {
+ // fill remaining space in buffer
+ System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
+ bufferOffset = maxBlockSize;
+ writeBlock();
+ // compute new offset and length
+ off += bufferRemainingLength;
+ len -= bufferRemainingLength;
+ bufferRemainingLength = maxBlockSize;
+ }
+
+ System.arraycopy(b, off, buffer, bufferOffset, len);
+ bufferOffset += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!finished) {
+ writeBlock();
+ }
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /**
+ * A simple state check to ensure the stream is still open.
+ */
+ private void ensureNotFinished() {
+ if (finished) {
+ throw new IllegalStateException(CLOSED_STREAM);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!finished) {
+ writeEndMark();
+ flush();
+ finished = true;
+ }
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+
+ public static class FLG {
+
+ private static final int VERSION = 1;
+
+ private final int presetDictionary;
+ private final int reserved1;
+ private final int contentChecksum;
+ private final int contentSize;
+ private final int blockChecksum;
+ private final int blockIndependence;
+ private final int version;
+
+ public FLG() {
+ this(false);
+ }
+
+ public FLG(boolean blockChecksum) {
+ this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+ }
+
+ private FLG(int presetDictionary, int reserved1, int contentChecksum,
+ int contentSize, int blockChecksum, int blockIndependence, int version) {
+ this.presetDictionary = presetDictionary;
+ this.reserved1 = reserved1;
+ this.contentChecksum = contentChecksum;
+ this.contentSize = contentSize;
+ this.blockChecksum = blockChecksum;
+ this.blockIndependence = blockIndependence;
+ this.version = version;
+ validate();
+ }
+
+ public static FLG fromByte(byte flg) {
+ int presetDictionary = (flg >>> 0) & 1;
+ int reserved1 = (flg >>> 1) & 1;
+ int contentChecksum = (flg >>> 2) & 1;
+ int contentSize = (flg >>> 3) & 1;
+ int blockChecksum = (flg >>> 4) & 1;
+ int blockIndependence = (flg >>> 5) & 1;
+ int version = (flg >>> 6) & 3;
+
+ return new FLG(presetDictionary, reserved1, contentChecksum,
+ contentSize, blockChecksum, blockIndependence, version);
+ }
+
+ public byte toByte() {
+ return (byte) (
+ ((presetDictionary & 1) << 0)
+ | ((reserved1 & 1) << 1)
+ | ((contentChecksum & 1) << 2)
+ | ((contentSize & 1) << 3)
+ | ((blockChecksum & 1) << 4)
+ | ((blockIndependence & 1) << 5)
+ | ((version & 3) << 6) );
+ }
+
+ private void validate() {
+ if (presetDictionary != 0) {
+ throw new RuntimeException("Preset dictionary is unsupported");
+ }
+ if (reserved1 != 0) {
+ throw new RuntimeException("Reserved1 field must be 0");
+ }
+ if (contentChecksum != 0) {
+ throw new RuntimeException("Content checksum is unsupported");
+ }
+ if (contentSize != 0) {
+ throw new RuntimeException("Content size is unsupported");
+ }
+ if (blockIndependence != 1) {
+ throw new RuntimeException("Dependent block stream is unsupported");
+ }
+ if (version != VERSION) {
+ throw new RuntimeException(String.format("Version %d is unsupported", version));
+ }
+ }
+
+ public boolean isPresetDictionarySet() {
+ return presetDictionary == 1;
+ }
+
+ public boolean isContentChecksumSet() {
+ return contentChecksum == 1;
+ }
+
+ public boolean isContentSizeSet() {
+ return contentSize == 1;
+ }
+
+ public boolean isBlockChecksumSet() {
+ return blockChecksum == 1;
+ }
+
+ public boolean isBlockIndependenceSet() {
+ return blockIndependence == 1;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+ }
+
+ public static class BD {
+
+ private final int reserved2;
+ private final int blockSizeValue;
+ private final int reserved3;
+
+ public BD() {
+ this(0, BLOCKSIZE_64KB, 0);
+ }
+
+ public BD(int blockSizeValue) {
+ this(0, blockSizeValue, 0);
+ }
+
+ private BD(int reserved2, int blockSizeValue, int reserved3) {
+ this.reserved2 = reserved2;
+ this.blockSizeValue = blockSizeValue;
+ this.reserved3 = reserved3;
+ validate();
+ }
+
+ public static BD fromByte(byte bd) {
+ int reserved2 = (bd >>> 0) & 15;
+ int blockMaximumSize = (bd >>> 4) & 7;
+ int reserved3 = (bd >>> 7) & 1;
+
+ return new BD(reserved2, blockMaximumSize, reserved3);
+ }
+
+ private void validate() {
+ if (reserved2 != 0) {
+ throw new RuntimeException("Reserved2 field must be 0");
+ }
+ if (blockSizeValue < 4 || blockSizeValue > 7) {
+ throw new RuntimeException("Block size value must be between 4 and 7");
+ }
+ if (reserved3 != 0) {
+ throw new RuntimeException("Reserved3 field must be 0");
+ }
+ }
+
+ // 2^(2n+8)
+ public int getBlockMaximumSize() {
+ return (1 << ((2 * blockSizeValue) + 8));
+ }
+
+ public byte toByte() {
+ return (byte) (
+ ((reserved2 & 15) << 0)
+ | ((blockSizeValue & 7) << 4)
+ | ((reserved3 & 1) << 7) );
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 5227b2d..65a7e43 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,7 +20,7 @@ package org.apache.kafka.common.record;
* The compression type to use
*/
public enum CompressionType {
- NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4",
0.5f), LZ4HC(4, "lz4hc", 0.5f);
+ NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4",
0.5f);
public final int id;
public final String name;
@@ -42,8 +42,6 @@ public enum CompressionType {
return SNAPPY;
case 3:
return LZ4;
- case 4:
- return LZ4HC;
default:
throw new IllegalArgumentException("Unknown compression type id: " + id);
}
@@ -58,8 +56,6 @@ public enum CompressionType {
return SNAPPY;
else if (LZ4.name.equals(name))
return LZ4;
- else if (LZ4HC.name.equals(name))
- return LZ4HC;
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 0323f5f..d684e68 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -218,27 +218,13 @@ public class Compressor {
}
case LZ4:
try {
- Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
- OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class)
+ Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream");
+ OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
.newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
- case LZ4HC:
- try {
- Class<?> factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory");
- Class<?> compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor");
- Class<?> lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
- Object factory = factoryClass.getMethod("fastestInstance").invoke(null);
- Object compressor = factoryClass.getMethod("highCompressor").invoke(factory);
- OutputStream stream = (OutputStream) lz4BlockOutputStream
- .getConstructor(OutputStream.class, Integer.TYPE, compressorClass)
- .newInstance(buffer, 1 << 16, compressor);
- return new DataOutputStream(stream);
- } catch (Exception e) {
- throw new KafkaException(e);
- }
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
@@ -266,10 +252,9 @@ public class Compressor {
throw new KafkaException(e);
}
case LZ4:
- case LZ4HC:
// dynamically load LZ4 class to avoid runtime dependency
try {
- Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream");
+ Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream");
InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
.newInstance(buffer);
return new DataInputStream(stream);
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a0827f5..527dd0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -12,6 +12,9 @@
*/
package org.apache.kafka.common.utils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
@@ -75,6 +78,34 @@ public class Utils {
}
/**
+ * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
+ *
+ * @param in The stream to read from
+ * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+ */
+ public static int readUnsignedIntLE(InputStream in) throws IOException {
+ return (in.read() << 8*0)
+ | (in.read() << 8*1)
+ | (in.read() << 8*2)
+ | (in.read() << 8*3);
+ }
+
+ /**
+ * Read an unsigned integer stored in little-endian format from a byte array
+ * at a given offset.
+ *
+ * @param buffer The byte array to read from
+ * @param offset The position in buffer to read from
+ * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
+ */
+ public static int readUnsignedIntLE(byte[] buffer, int offset) {
+ return (buffer[offset++] << 8*0)
+ | (buffer[offset++] << 8*1)
+ | (buffer[offset++] << 8*2)
+ | (buffer[offset] << 8*3);
+ }
+
+ /**
* Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
*
* @param buffer The buffer to write to
@@ -96,6 +127,35 @@ public class Utils {
}
/**
+ * Write an unsigned integer in little-endian format to the {@link OutputStream}.
+ *
+ * @param out The stream to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException
{
+ out.write(value >>> 8*0);
+ out.write(value >>> 8*1);
+ out.write(value >>> 8*2);
+ out.write(value >>> 8*3);
+ }
+
+ /**
+ * Write an unsigned integer in little-endian format to a byte array
+ * at a given offset.
+ *
+ * @param buffer The byte array to write to
+ * @param offset The position in buffer to write to
+ * @param value The value to write
+ */
+ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
+ buffer[offset++] = (byte) (value >>> 8*0);
+ buffer[offset++] = (byte) (value >>> 8*1);
+ buffer[offset++] = (byte) (value >>> 8*2);
+ buffer[offset] = (byte) (value >>> 8*3);
+ }
+
+
+ /**
* Get the absolute value of the given number. If the number is Int.MinValue return 0.
This is different from
* java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index 39d65d7..47ae3e2 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
-# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc.
-# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally
+# specify the compression codec for all data generated: none, gzip, snappy, lz4.
+# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index de0a0fa..9439d2b 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -24,7 +24,6 @@ object CompressionCodec {
case GZIPCompressionCodec.codec => GZIPCompressionCodec
case SnappyCompressionCodec.codec => SnappyCompressionCodec
case LZ4CompressionCodec.codec => LZ4CompressionCodec
- case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression
codec".format(codec))
}
}
@@ -34,7 +33,6 @@ object CompressionCodec {
case GZIPCompressionCodec.name => GZIPCompressionCodec
case SnappyCompressionCodec.name => SnappyCompressionCodec
case LZ4CompressionCodec.name => LZ4CompressionCodec
- case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression
codec".format(name))
}
}
@@ -62,11 +60,6 @@ case object LZ4CompressionCodec extends CompressionCodec {
val name = "lz4"
}
-case object LZ4HCCompressionCodec extends CompressionCodec {
- val codec = 4
- val name = "lz4hc"
-}
-
case object NoCompressionCodec extends CompressionCodec {
val codec = 0
val name = "none"
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index 8420e13..c721040 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -22,6 +22,8 @@ import java.util.zip.GZIPOutputStream
import java.util.zip.GZIPInputStream
import java.io.InputStream
+import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
+
object CompressionFactory {
def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = {
@@ -32,11 +34,7 @@ object CompressionFactory {
import org.xerial.snappy.SnappyOutputStream
new SnappyOutputStream(stream)
case LZ4CompressionCodec =>
- import net.jpountz.lz4.LZ4BlockOutputStream
- new LZ4BlockOutputStream(stream)
- case LZ4HCCompressionCodec =>
- import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory}
- new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor())
+ new KafkaLZ4BlockOutputStream(stream)
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}
@@ -49,9 +47,8 @@ object CompressionFactory {
case SnappyCompressionCodec =>
import org.xerial.snappy.SnappyInputStream
new SnappyInputStream(stream)
- case LZ4CompressionCodec | LZ4HCCompressionCodec =>
- import net.jpountz.lz4.LZ4BlockInputStream
- new LZ4BlockInputStream(stream)
+ case LZ4CompressionCodec =>
+ new KafkaLZ4BlockInputStream(stream)
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index b024a69..397d80d 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -113,7 +113,7 @@ object ConsoleProducer {
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are
synchronously, one at a time as they arrive.")
- val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec:
either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." +
+ val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec:
either 'none', 'gzip', 'snappy', or 'lz4'." +
"If specified without value,
then it defaults to 'gzip'")
.withOptionalArg()
.describedAs("compression-codec")
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/core/src/main/scala/kafka/tools/PerfConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala
index c720029..d073acf 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) {
.defaultsTo(200)
val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent
compressed")
.withRequiredArg
- .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec
as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4")
+ .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec
as 2, LZ4CompressionCodec as 3")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val helpOpt = parser.accepts("help", "Print usage.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index c954851..6379f2b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -47,7 +47,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with
ZooK
private val config = new KafkaConfig(props)
private val topic = "topic"
- private val numRecords = 100
+ private val numRecords = 2000
@Before
override def setUp() {
@@ -73,6 +73,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with
ZooK
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
+ props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
var producer = new KafkaProducer(props)
val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
@@ -125,7 +127,6 @@ object ProducerCompressionTest {
list.add(Array("gzip"))
list.add(Array("snappy"))
list.add(Array("lz4"))
- list.add(Array("lz4hc"))
list
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 6f0addc..0bb275d 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -32,8 +32,6 @@ class MessageCompressionTest extends JUnitSuite {
codecs += SnappyCompressionCodec
if(isLZ4Available)
codecs += LZ4CompressionCodec
- if (izLZ4HCAvailable)
- codecs += LZ4HCCompressionCodec
for(codec <- codecs)
testSimpleCompressDecompress(codec)
}
@@ -74,14 +72,4 @@ class MessageCompressionTest extends JUnitSuite {
case e: UnsatisfiedLinkError => false
}
}
-
- def izLZ4HCAvailable(): Boolean = {
- try {
- val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1
<< 16,
- net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor())
- true
- } catch {
- case e: UnsatisfiedLinkError => false
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37356bfe/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 958c1a6..7b74a0d 100644
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite {
def setUp(): Unit = {
val keys = Array(null, "key".getBytes, "".getBytes)
val vals = Array("value".getBytes, "".getBytes, null)
- val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec,
LZ4CompressionCodec, LZ4HCCompressionCodec)
+ val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec,
LZ4CompressionCodec)
for(k <- keys; v <- vals; codec <- codecs)
messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
}
|