http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
new file mode 100644
index 0000000..6deeb52
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+
+/**
+ * This {@link RecordBatch} implementation is for magic versions 0 and 1. In addition to implementing
+ * {@link RecordBatch}, it also implements {@link Record}, which exposes the duality of the old message
+ * format in its handling of compressed messages. The wrapper record is considered the record batch in this
+ * interface, while the inner records are considered the log records (though they both share the same schema).
+ *
+ * In general, this class should not be used directly. Instances of {@link Records} provides access to this
+ * class indirectly through the {@link RecordBatch} interface.
+ */
+public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch implements Record {
+
+ public abstract LegacyRecord outerRecord();
+
+ @Override
+ public long lastOffset() {
+ return offset();
+ }
+
+ @Override
+ public boolean isValid() {
+ return outerRecord().isValid();
+ }
+
+ @Override
+ public void ensureValid() {
+ outerRecord().ensureValid();
+ }
+
+ @Override
+ public int keySize() {
+ return outerRecord().keySize();
+ }
+
+ @Override
+ public boolean hasKey() {
+ return outerRecord().hasKey();
+ }
+
+ @Override
+ public ByteBuffer key() {
+ return outerRecord().key();
+ }
+
+ @Override
+ public int valueSize() {
+ return outerRecord().valueSize();
+ }
+
+ @Override
+ public boolean hasValue() {
+ return !outerRecord().hasNullValue();
+ }
+
+ @Override
+ public ByteBuffer value() {
+ return outerRecord().value();
+ }
+
+ @Override
+ public Header[] headers() {
+ return Record.EMPTY_HEADERS;
+ }
+
+ @Override
+ public boolean hasMagic(byte magic) {
+ return magic == outerRecord().magic();
+ }
+
+ @Override
+ public boolean hasTimestampType(TimestampType timestampType) {
+ return outerRecord().timestampType() == timestampType;
+ }
+
+ @Override
+ public long checksum() {
+ return outerRecord().checksum();
+ }
+
+ @Override
+ public long maxTimestamp() {
+ return timestamp();
+ }
+
+ @Override
+ public long timestamp() {
+ return outerRecord().timestamp();
+ }
+
+ @Override
+ public TimestampType timestampType() {
+ return outerRecord().timestampType();
+ }
+
+ @Override
+ public long baseOffset() {
+ return iterator().next().offset();
+ }
+
+ @Override
+ public byte magic() {
+ return outerRecord().magic();
+ }
+
+ @Override
+ public CompressionType compressionType() {
+ return outerRecord().compressionType();
+ }
+
+ @Override
+ public int sizeInBytes() {
+ return outerRecord().sizeInBytes() + LOG_OVERHEAD;
+ }
+
+ @Override
+ public Integer countOrNull() {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "LegacyRecordBatch(" + offset() + ", " + outerRecord() + ")";
+ }
+
+ @Override
+ public void writeTo(ByteBuffer buffer) {
+ writeHeader(buffer, offset(), outerRecord().sizeInBytes());
+ buffer.put(outerRecord().buffer().duplicate());
+ }
+
+ @Override
+ public long producerId() {
+ return RecordBatch.NO_PRODUCER_ID;
+ }
+
+ @Override
+ public short producerEpoch() {
+ return RecordBatch.NO_PRODUCER_EPOCH;
+ }
+
+ @Override
+ public long sequence() {
+ return RecordBatch.NO_SEQUENCE;
+ }
+
+ @Override
+ public int baseSequence() {
+ return RecordBatch.NO_SEQUENCE;
+ }
+
+ @Override
+ public int lastSequence() {
+ return RecordBatch.NO_SEQUENCE;
+ }
+
+ @Override
+ public boolean isTransactional() {
+ return false;
+ }
+
+ @Override
+ public int partitionLeaderEpoch() {
+ return RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+ }
+
+ @Override
+ public boolean isControlRecord() {
+ return false;
+ }
+
+ /**
+ * Get an iterator for the nested entries contained within this batch. Note that
+ * if the batch is not compressed, then this method will return an iterator over the
+ * shallow record only (i.e. this object).
+ * @return An iterator over the records contained within this batch
+ */
+ @Override
+ public Iterator<Record> iterator() {
+ if (isCompressed())
+ return new DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+ else
+ return Collections.<Record>singletonList(this).iterator();
+ }
+
+ static void writeHeader(ByteBuffer buffer, long offset, int size) {
+ buffer.putLong(offset);
+ buffer.putInt(size);
+ }
+
+ static void writeHeader(DataOutputStream out, long offset, int size) throws IOException {
+ out.writeLong(offset);
+ out.writeInt(size);
+ }
+
+ private static final class DataLogInputStream implements LogInputStream<AbstractLegacyRecordBatch> {
+ private final DataInputStream stream;
+ protected final int maxMessageSize;
+
+ DataLogInputStream(DataInputStream stream, int maxMessageSize) {
+ this.stream = stream;
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ public AbstractLegacyRecordBatch nextBatch() throws IOException {
+ try {
+ long offset = stream.readLong();
+ int size = stream.readInt();
+ if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
+ if (size > maxMessageSize)
+ throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+ byte[] recordBuffer = new byte[size];
+ stream.readFully(recordBuffer, 0, size);
+ ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
+ return new BasicLegacyRecordBatch(offset, new LegacyRecord(buf));
+ } catch (EOFException e) {
+ return null;
+ }
+ }
+ }
+
+ private static class DeepRecordsIterator extends AbstractIterator<Record> {
+ private final ArrayDeque<AbstractLegacyRecordBatch> batches;
+ private final long absoluteBaseOffset;
+ private final byte wrapperMagic;
+
+ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+ LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
+ this.wrapperMagic = wrapperRecord.magic();
+
+ CompressionType compressionType = wrapperRecord.compressionType();
+ ByteBuffer wrapperValue = wrapperRecord.value();
+ if (wrapperValue == null)
+ throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
+ wrapperMagic + ")");
+
+ DataInputStream stream = new DataInputStream(compressionType.wrapForInput(
+ new ByteBufferInputStream(wrapperValue), wrapperRecord.magic()));
+ LogInputStream<AbstractLegacyRecordBatch> logStream = new DataLogInputStream(stream, maxMessageSize);
+
+ long wrapperRecordOffset = wrapperEntry.lastOffset();
+ long wrapperRecordTimestamp = wrapperRecord.timestamp();
+ this.batches = new ArrayDeque<>();
+
+ // If relative offset is used, we need to decompress the entire message first to compute
+ // the absolute offset. For simplicity and because it's a format that is on its way out, we
+ // do the same for message format version 0
+ try {
+ while (true) {
+ AbstractLegacyRecordBatch batch = logStream.nextBatch();
+ if (batch == null)
+ break;
+
+ LegacyRecord record = batch.outerRecord();
+ byte magic = record.magic();
+
+ if (ensureMatchingMagic && magic != wrapperMagic)
+ throw new InvalidRecordException("Compressed message magic " + magic +
+ " does not match wrapper magic " + wrapperMagic);
+
+ if (magic > RecordBatch.MAGIC_VALUE_V0) {
+ LegacyRecord recordWithTimestamp = new LegacyRecord(
+ record.buffer(),
+ wrapperRecordTimestamp,
+ wrapperRecord.timestampType());
+ batch = new BasicLegacyRecordBatch(batch.lastOffset(), recordWithTimestamp);
+ }
+ batches.addLast(batch);
+
+ // break early if we reach the last offset in the batch
+ if (batch.offset() == wrapperRecordOffset)
+ break;
+ }
+
+ if (batches.isEmpty())
+ throw new InvalidRecordException("Found invalid compressed record set with no inner records");
+
+ if (wrapperMagic > RecordBatch.MAGIC_VALUE_V0)
+ this.absoluteBaseOffset = wrapperRecordOffset - batches.getLast().lastOffset();
+ else
+ this.absoluteBaseOffset = -1;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ } finally {
+ Utils.closeQuietly(stream, "records iterator stream");
+ }
+ }
+
+ @Override
+ protected Record makeNext() {
+ if (batches.isEmpty())
+ return allDone();
+
+ AbstractLegacyRecordBatch entry = batches.remove();
+
+ // Convert offset to absolute offset if needed.
+ if (absoluteBaseOffset >= 0) {
+ long absoluteOffset = absoluteBaseOffset + entry.lastOffset();
+ entry = new BasicLegacyRecordBatch(absoluteOffset, entry.outerRecord());
+ }
+
+ if (entry.isCompressed())
+ throw new InvalidRecordException("Inner messages must not be compressed");
+
+ return entry;
+ }
+ }
+
+ private static class BasicLegacyRecordBatch extends AbstractLegacyRecordBatch {
+ private final LegacyRecord record;
+ private final long offset;
+
+ private BasicLegacyRecordBatch(long offset, LegacyRecord record) {
+ this.offset = offset;
+ this.record = record;
+ }
+
+ @Override
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public LegacyRecord outerRecord() {
+ return record;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ BasicLegacyRecordBatch that = (BasicLegacyRecordBatch) o;
+
+ return offset == that.offset &&
+ (record != null ? record.equals(that.record) : that.record == null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = record != null ? record.hashCode() : 0;
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ return result;
+ }
+ }
+
+ static class ByteBufferLegacyRecordBatch extends AbstractLegacyRecordBatch implements MutableRecordBatch {
+ private final ByteBuffer buffer;
+ private final LegacyRecord record;
+
+ ByteBufferLegacyRecordBatch(ByteBuffer buffer) {
+ this.buffer = buffer;
+ buffer.position(LOG_OVERHEAD);
+ this.record = new LegacyRecord(buffer.slice());
+ buffer.position(OFFSET_OFFSET);
+ }
+
+ @Override
+ public long offset() {
+ return buffer.getLong(OFFSET_OFFSET);
+ }
+
+ @Override
+ public LegacyRecord outerRecord() {
+ return record;
+ }
+
+ @Override
+ public void setLastOffset(long offset) {
+ buffer.putLong(OFFSET_OFFSET, offset);
+ }
+
+ @Override
+ public void setMaxTimestamp(TimestampType timestampType, long timestamp) {
+ if (record.magic() == RecordBatch.MAGIC_VALUE_V0)
+ throw new UnsupportedOperationException("Cannot set timestamp for a record with magic = 0");
+
+ long currentTimestamp = record.timestamp();
+ // We don't need to recompute crc if the timestamp is not updated.
+ if (record.timestampType() == timestampType && currentTimestamp == timestamp)
+ return;
+
+ setTimestampAndUpdateCrc(timestampType, timestamp);
+ }
+
+ @Override
+ public void setPartitionLeaderEpoch(int epoch) {
+ throw new UnsupportedOperationException("Magic versions prior to 2 do not support partition leader epoch");
+ }
+
+ private void setTimestampAndUpdateCrc(TimestampType timestampType, long timestamp) {
+ byte attributes = LegacyRecord.computeAttributes(magic(), compressionType(), timestampType);
+ buffer.put(LOG_OVERHEAD + LegacyRecord.ATTRIBUTES_OFFSET, attributes);
+ buffer.putLong(LOG_OVERHEAD + LegacyRecord.TIMESTAMP_OFFSET, timestamp);
+ long crc = record.computeChecksum();
+ ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + LegacyRecord.CRC_OFFSET, crc);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ByteBufferLegacyRecordBatch that = (ByteBufferLegacyRecordBatch) o;
+
+ return buffer != null ? buffer.equals(that.buffer) : that.buffer == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return buffer != null ? buffer.hashCode() : 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
new file mode 100644
index 0000000..53245e7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+abstract class AbstractRecordBatch implements RecordBatch {
+
+ @Override
+ public long nextOffset() {
+ return lastOffset() + 1;
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return compressionType() != CompressionType.NONE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 1548a95..decc06c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -16,6 +16,10 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -25,75 +29,158 @@ public abstract class AbstractRecords implements Records {
private final Iterable<Record> records = new Iterable<Record>() {
@Override
public Iterator<Record> iterator() {
- return new Iterator<Record>() {
- private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator();
- @Override
- public boolean hasNext() {
- return deepEntries.hasNext();
- }
- @Override
- public Record next() {
- return deepEntries.next().record();
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Removal not supported");
- }
- };
+ return recordsIterator();
}
};
@Override
- public boolean hasMatchingShallowMagic(byte magic) {
- for (LogEntry entry : shallowEntries())
- if (entry.magic() != magic)
+ public boolean hasMatchingMagic(byte magic) {
+ for (RecordBatch batch : batches())
+ if (batch.magic() != magic)
return false;
return true;
}
- /**
- * Convert this message set to use the specified message format.
- */
@Override
- public Records toMessageFormat(byte toMagic, TimestampType upconvertTimestampType) {
- List<LogEntry> converted = new ArrayList<>();
- for (LogEntry entry : deepEntries())
- converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic, upconvertTimestampType)));
-
- if (converted.isEmpty()) {
- // This indicates that the message is too large, which indicates that the buffer is not large
- // enough to hold a full log entry. We just return all the bytes in the file message set.
- // Even though the message set does not have the right format version, we expect old clients
- // to raise an error to the user after reading the message size and seeing that there
- // are not enough available bytes in the response to read the full message.
- return this;
- } else {
- // We use the first message to determine the compression type for the resulting message set.
- // This could result in message sets which are either larger or smaller than the original size.
- // For example, it could end up larger if most messages were previously compressed, but
- // it just so happens that the first one is not. There is also some risk that this can
- // cause some timestamp information to be lost (e.g. if the timestamp type was changed) since
- // we are essentially merging multiple message sets. However, currently this method is only
- // used for down-conversion, so we've ignored the problem.
- CompressionType compressionType = shallowEntries().iterator().next().record().compressionType();
- return MemoryRecords.withLogEntries(compressionType, converted);
+ public boolean hasCompatibleMagic(byte magic) {
+ for (RecordBatch batch : batches())
+ if (batch.magic() > magic)
+ return false;
+ return true;
+ }
+
+ protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte toMagic) {
+ // maintain the batch along with the decompressed records to avoid the need to decompress again
+ List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
+ int totalSizeEstimate = 0;
+
+ for (RecordBatch batch : batches) {
+ if (batch.magic() <= toMagic) {
+ totalSizeEstimate += batch.sizeInBytes();
+ recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
+ } else {
+ List<Record> records = Utils.toList(batch.iterator());
+ final long baseOffset;
+ if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2)
+ baseOffset = batch.baseOffset();
+ else
+ baseOffset = records.get(0).offset();
+ totalSizeEstimate += estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
+ recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
+ }
}
+
+ ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
+ for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
+ if (recordBatchAndRecords.batch.magic() <= toMagic)
+ recordBatchAndRecords.batch.writeTo(buffer);
+ else
+ buffer = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
+ }
+
+ buffer.flip();
+ return MemoryRecords.readableRecords(buffer);
}
- public static int estimatedSize(CompressionType compressionType, Iterable<LogEntry> entries) {
- int size = 0;
- for (LogEntry entry : entries)
- size += entry.sizeInBytes();
- // NOTE: 1024 is the minimum block size for snappy encoding
- return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
+ /**
+ * Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
+ * one (e.g. it may require expansion).
+ */
+ private ByteBuffer convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
+ RecordBatch batch = recordBatchAndRecords.batch;
+ final TimestampType timestampType = batch.timestampType();
+ long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
+ timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
+ for (Record record : recordBatchAndRecords.records) {
+ // control messages are only supported in v2 and above, so skip when down-converting
+ if (magic < RecordBatch.MAGIC_VALUE_V2 && record.isControlRecord())
+ continue;
+ builder.append(record);
+ }
+
+ builder.close();
+ return builder.buffer();
}
/**
* Get an iterator over the deep records.
* @return An iterator over the records
*/
+ @Override
public Iterable<Record> records() {
return records;
}
+ private Iterator<Record> recordsIterator() {
+ return new AbstractIterator<Record>() {
+ private final Iterator<? extends RecordBatch> batches = batches().iterator();
+ private Iterator<Record> records;
+
+ @Override
+ protected Record makeNext() {
+ if (records != null && records.hasNext())
+ return records.next();
+
+ if (batches.hasNext()) {
+ records = batches.next().iterator();
+ return makeNext();
+ }
+
+ return allDone();
+ }
+ };
+ }
+
+ public static int estimateSizeInBytes(byte magic,
+ long baseOffset,
+ CompressionType compressionType,
+ Iterable<Record> records) {
+ int size = 0;
+ if (magic <= RecordBatch.MAGIC_VALUE_V1) {
+ for (Record record : records)
+ size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
+ } else {
+ size = DefaultRecordBatch.sizeInBytes(baseOffset, records);
+ }
+ return estimateCompressedSizeInBytes(size, compressionType);
+ }
+
+ public static int estimateSizeInBytes(byte magic,
+ CompressionType compressionType,
+ Iterable<SimpleRecord> records) {
+ int size = 0;
+ if (magic <= RecordBatch.MAGIC_VALUE_V1) {
+ for (SimpleRecord record : records)
+ size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
+ } else {
+ size = DefaultRecordBatch.sizeInBytes(records);
+ }
+ return estimateCompressedSizeInBytes(size, compressionType);
+ }
+
+ private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
+ return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
+ }
+
+ public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value) {
+ if (magic >= RecordBatch.MAGIC_VALUE_V2)
+ return DefaultRecordBatch.batchSizeUpperBound(key, value, Record.EMPTY_HEADERS);
+ else
+ return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
+ }
+
+ private static class RecordBatchAndRecords {
+ private final RecordBatch batch;
+ private final List<Record> records;
+ private final Long baseOffset;
+
+ private RecordBatchAndRecords(RecordBatch batch, List<Record> records, Long baseOffset) {
+ this.batch = batch;
+ this.records = records;
+ this.baseOffset = baseOffset;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index f4a3da4..bd54e40 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -17,19 +17,19 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.utils.ByteUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
-import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
/**
* A byte buffer backed log input stream. This class avoids the need to copy records by returning
* slices from the underlying byte buffer.
*/
-class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStream.ByteBufferLogEntry> {
+class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
private final ByteBuffer buffer;
private final int maxMessageSize;
@@ -38,80 +38,35 @@ class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStrea
this.maxMessageSize = maxMessageSize;
}
- public ByteBufferLogEntry nextEntry() throws IOException {
+ public MutableRecordBatch nextBatch() throws IOException {
int remaining = buffer.remaining();
if (remaining < LOG_OVERHEAD)
return null;
- int recordSize = buffer.getInt(buffer.position() + Records.SIZE_OFFSET);
- if (recordSize < Record.RECORD_OVERHEAD_V0)
- throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+ int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
+ // V0 has the smallest overhead, stricter checking is done later
+ if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
if (recordSize > maxMessageSize)
throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
- int entrySize = recordSize + LOG_OVERHEAD;
- if (remaining < entrySize)
+ int batchSize = recordSize + LOG_OVERHEAD;
+ if (remaining < batchSize)
return null;
- ByteBuffer entrySlice = buffer.slice();
- entrySlice.limit(entrySize);
- buffer.position(buffer.position() + entrySize);
- return new ByteBufferLogEntry(entrySlice);
- }
-
- public static class ByteBufferLogEntry extends LogEntry {
- private final ByteBuffer buffer;
- private final Record record;
-
- private ByteBufferLogEntry(ByteBuffer buffer) {
- this.buffer = buffer;
- buffer.position(LOG_OVERHEAD);
- this.record = new Record(buffer.slice());
- buffer.position(OFFSET_OFFSET);
- }
-
- @Override
- public long offset() {
- return buffer.getLong(OFFSET_OFFSET);
- }
-
- @Override
- public Record record() {
- return record;
- }
-
- public void setOffset(long offset) {
- buffer.putLong(OFFSET_OFFSET, offset);
- }
-
- public void setCreateTime(long timestamp) {
- if (record.magic() == Record.MAGIC_VALUE_V0)
- throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
-
- long currentTimestamp = record.timestamp();
- // We don't need to recompute crc if the timestamp is not updated.
- if (record.timestampType() == TimestampType.CREATE_TIME && currentTimestamp == timestamp)
- return;
- setTimestampAndUpdateCrc(TimestampType.CREATE_TIME, timestamp);
- }
+ byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
- public void setLogAppendTime(long timestamp) {
- if (record.magic() == Record.MAGIC_VALUE_V0)
- throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
- setTimestampAndUpdateCrc(TimestampType.LOG_APPEND_TIME, timestamp);
- }
+ ByteBuffer batchSlice = buffer.slice();
+ batchSlice.limit(batchSize);
+ buffer.position(buffer.position() + batchSize);
- private void setTimestampAndUpdateCrc(TimestampType timestampType, long timestamp) {
- byte attributes = record.attributes();
- buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, timestampType.updateAttributes(attributes));
- buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
- long crc = record.computeChecksum();
- ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
- }
+ if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
+ throw new CorruptRecordException("Invalid magic found in record: " + magic);
- public ByteBuffer buffer() {
- return buffer;
- }
+ if (magic > RecordBatch.MAGIC_VALUE_V1)
+ return new DefaultRecordBatch(batchSlice);
+ else
+ return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index d88c530..093d5b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -87,7 +87,7 @@ public enum CompressionType {
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
try {
return (OutputStream) LZ4_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer,
- messageVersion == Record.MAGIC_VALUE_V0);
+ messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Exception e) {
throw new KafkaException(e);
}
@@ -97,7 +97,7 @@ public enum CompressionType {
public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
try {
return (InputStream) LZ4_INPUT_STREAM_SUPPLIER.get().newInstance(buffer,
- messageVersion == Record.MAGIC_VALUE_V0);
+ messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Exception e) {
throw new KafkaException(e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
new file mode 100644
index 0000000..6bd614a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Control records specify a schema for the record key which includes a version and type:
+ *
+ * Key => Version Type
+ * Version => Int16
+ * Type => Int16
+ *
+ * In the future, the version can be bumped to indicate a new schema, but it must be backwards compatible
+ * with the current schema. In general, this means we can add new fields, but we cannot remove old ones.
+ *
+ * Note that control records are not considered for compaction by the log cleaner.
+ *
+ * The schema for the value field is left to the control record type to specify.
+ */
+public enum ControlRecordType {
+ COMMIT((short) 0),
+ ABORT((short) 1),
+
+ // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+ UNKNOWN((short) -1);
+
+ private static final Logger log = LoggerFactory.getLogger(ControlRecordType.class);
+
+ static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
+ private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new Schema(
+ new Field("version", Type.INT16),
+ new Field("type", Type.INT16));
+
+ final short type;
+
+ ControlRecordType(short type) {
+ this.type = type;
+ }
+
+ public Struct recordKey() {
+ if (this == UNKNOWN)
+ throw new IllegalArgumentException("Cannot serialize UNKNOWN control record type");
+
+ Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
+ struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
+ struct.set("type", type);
+ return struct;
+ }
+
+ public static ControlRecordType parse(ByteBuffer key) {
+ short version = key.getShort(0);
+ if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
+ log.debug("Received unknown control record key version {}. Parsing as version {}" +
+ version, CURRENT_CONTROL_RECORD_KEY_VERSION);
+ short type = key.getShort(2);
+ switch (type) {
+ case 0:
+ return COMMIT;
+ case 1:
+ return ABORT;
+ default:
+ return UNKNOWN;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
new file mode 100644
index 0000000..7c9b6f1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
+import static org.apache.kafka.common.utils.Utils.wrapNullable;
+
+/**
+ * This class implements the inner record format for magic 2 and above. The schema is as follows:
+ *
+ *
+ * Record =>
+ * Length => Varint
+ * Attributes => Int8
+ * TimestampDelta => Varlong
+ * OffsetDelta => Varint
+ * Key => Bytes
+ * Value => Bytes
+ * Headers => [HeaderKey HeaderValue]
+ * HeaderKey => String
+ * HeaderValue => Bytes
+ *
+ * Note that in this schema, the Bytes and String types use a variable length integer to represent
+ * the length of the field. The array type used for the headers also uses a Varint for the number of
+ * headers.
+ *
+ * The current record attributes are depicted below:
+ *
+ * -----------------------------------
+ * | Unused (1-7) | Control Flag (0) |
+ * -----------------------------------
+ *
+ * The control flag is used to implement control records (see {@link ControlRecordType}).
+ *
+ * The offset and timestamp deltas compute the difference relative to the base offset and
+ * base timestamp of the log entry that this record is contained in.
+ */
+public class DefaultRecord implements Record {
+
+ // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
+ public static final int MAX_RECORD_OVERHEAD = 21;
+
+ private static final int CONTROL_FLAG_MASK = 0x01;
+ private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);
+
+ private final int sizeInBytes;
+ private final byte attributes;
+ private final long offset;
+ private final long timestamp;
+ private final int sequence;
+ private final ByteBuffer key;
+ private final ByteBuffer value;
+ private final Header[] headers;
+ private Long checksum = null;
+
+ private DefaultRecord(int sizeInBytes,
+ byte attributes,
+ long offset,
+ long timestamp,
+ int sequence,
+ ByteBuffer key,
+ ByteBuffer value,
+ Header[] headers) {
+ this.sizeInBytes = sizeInBytes;
+ this.attributes = attributes;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ this.sequence = sequence;
+ this.key = key;
+ this.value = value;
+ this.headers = headers;
+ }
+
+ @Override
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public long sequence() {
+ return sequence;
+ }
+
+ @Override
+ public int sizeInBytes() {
+ return sizeInBytes;
+ }
+
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ public byte attributes() {
+ return attributes;
+ }
+
+ @Override
+ public long checksum() {
+ if (checksum == null)
+ checksum = computeChecksum(timestamp, key, value);
+ return checksum;
+ }
+
+ @Override
+ public boolean isValid() {
+ // new versions of the message format (2 and above) do not contain an individual record checksum;
+ // instead they are validated with the checksum at the log entry level
+ return true;
+ }
+
+ @Override
+ public void ensureValid() {}
+
+ @Override
+ public int keySize() {
+ return key == null ? -1 : key.remaining();
+ }
+
+ @Override
+ public int valueSize() {
+ return value == null ? -1 : value.remaining();
+ }
+
+ @Override
+ public boolean hasKey() {
+ return key != null;
+ }
+
+ @Override
+ public ByteBuffer key() {
+ return key == null ? null : key.duplicate();
+ }
+
+ @Override
+ public boolean hasValue() {
+ return value != null;
+ }
+
+ @Override
+ public ByteBuffer value() {
+ return value == null ? null : value.duplicate();
+ }
+
+ @Override
+ public Header[] headers() {
+ return headers;
+ }
+
+ /**
+ * Write the record to `out` and return its crc.
+ */
+ public static long writeTo(DataOutputStream out,
+ boolean isControlRecord,
+ int offsetDelta,
+ long timestampDelta,
+ ByteBuffer key,
+ ByteBuffer value,
+ Header[] headers) throws IOException {
+ int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
+ ByteUtils.writeVarint(sizeInBytes, out);
+
+ byte attributes = computeAttributes(isControlRecord);
+ out.write(attributes);
+
+ ByteUtils.writeVarlong(timestampDelta, out);
+ ByteUtils.writeVarint(offsetDelta, out);
+
+ if (key == null) {
+ ByteUtils.writeVarint(-1, out);
+ } else {
+ int keySize = key.remaining();
+ ByteUtils.writeVarint(keySize, out);
+ Utils.writeTo(out, key, keySize);
+ }
+
+ if (value == null) {
+ ByteUtils.writeVarint(-1, out);
+ } else {
+ int valueSize = value.remaining();
+ ByteUtils.writeVarint(valueSize, out);
+ Utils.writeTo(out, value, valueSize);
+ }
+
+ if (headers == null)
+ throw new IllegalArgumentException("Headers cannot be null");
+
+ ByteUtils.writeVarint(headers.length, out);
+
+ for (Header header : headers) {
+ String headerKey = header.key();
+ if (headerKey == null)
+ throw new IllegalArgumentException("Invalid null header key found in headers");
+
+ byte[] utf8Bytes = Utils.utf8(headerKey);
+ ByteUtils.writeVarint(utf8Bytes.length, out);
+ out.write(utf8Bytes);
+
+ ByteBuffer headerValue = header.value();
+ if (headerValue == null) {
+ ByteUtils.writeVarint(-1, out);
+ } else {
+ int headerValueSize = headerValue.remaining();
+ ByteUtils.writeVarint(headerValueSize, out);
+ Utils.writeTo(out, headerValue, headerValueSize);
+ }
+ }
+
+ return computeChecksum(timestampDelta, key, value);
+ }
+
+ /**
+ * Write the record to `out` and return its crc.
+ */
+ public static long writeTo(ByteBuffer out,
+ boolean isControlRecord,
+ int offsetDelta,
+ long timestampDelta,
+ ByteBuffer key,
+ ByteBuffer value,
+ Header[] headers) {
+ try {
+ return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), isControlRecord, offsetDelta,
+ timestampDelta, key, value, headers);
+ } catch (IOException e) {
+ // cannot actually be raised by ByteBufferOutputStream
+ throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e);
+ }
+ }
+
+ /**
+ * Compute the checksum of the record from the timestamp, key and value payloads
+ */
+ private static long computeChecksum(long timestamp,
+ ByteBuffer key,
+ ByteBuffer value) {
+ Crc32 crc = new Crc32();
+ crc.updateLong(timestamp);
+
+ if (key != null)
+ crc.update(key, key.remaining());
+
+ if (value != null)
+ crc.update(value, value.remaining());
+
+ return crc.getValue();
+ }
+
+ @Override
+ public boolean hasMagic(byte magic) {
+ return magic >= MAGIC_VALUE_V2;
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return false;
+ }
+
+ @Override
+ public boolean hasTimestampType(TimestampType timestampType) {
+ return false;
+ }
+
+ @Override
+ public boolean isControlRecord() {
+ return (attributes & CONTROL_FLAG_MASK) != 0;
+ }
+
+ public static DefaultRecord readFrom(DataInputStream input,
+ long baseOffset,
+ long baseTimestamp,
+ int baseSequence,
+ Long logAppendTime) throws IOException {
+ int sizeOfBodyInBytes = ByteUtils.readVarint(input);
+ ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
+ input.readFully(recordBuffer.array(), recordBuffer.arrayOffset(), sizeOfBodyInBytes);
+ int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+ return readFrom(recordBuffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ }
+
+ public static DefaultRecord readFrom(ByteBuffer buffer,
+ long baseOffset,
+ long baseTimestamp,
+ int baseSequence,
+ Long logAppendTime) {
+ int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
+ if (buffer.remaining() < sizeOfBodyInBytes)
+ return null;
+
+ int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+ return readFrom(buffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ }
+
+ private static DefaultRecord readFrom(ByteBuffer buffer,
+ int sizeInBytes,
+ long baseOffset,
+ long baseTimestamp,
+ int baseSequence,
+ Long logAppendTime) {
+ byte attributes = buffer.get();
+ long timestampDelta = ByteUtils.readVarlong(buffer);
+ long timestamp = baseTimestamp + timestampDelta;
+ if (logAppendTime != null)
+ timestamp = logAppendTime;
+
+ int offsetDelta = ByteUtils.readVarint(buffer);
+ long offset = baseOffset + offsetDelta;
+ int sequence = baseSequence >= 0 ? baseSequence + offsetDelta : RecordBatch.NO_SEQUENCE;
+
+ ByteBuffer key = null;
+ int keySize = ByteUtils.readVarint(buffer);
+ if (keySize >= 0) {
+ key = buffer.slice();
+ key.limit(keySize);
+ buffer.position(buffer.position() + keySize);
+ }
+
+ ByteBuffer value = null;
+ int valueSize = ByteUtils.readVarint(buffer);
+ if (valueSize >= 0) {
+ value = buffer.slice();
+ value.limit(valueSize);
+ buffer.position(buffer.position() + valueSize);
+ }
+
+ int numHeaders = ByteUtils.readVarint(buffer);
+ if (numHeaders < 0)
+ throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
+
+ if (numHeaders == 0)
+ return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, Record.EMPTY_HEADERS);
+
+ Header[] headers = new Header[numHeaders];
+ for (int i = 0; i < numHeaders; i++) {
+ int headerKeySize = ByteUtils.readVarint(buffer);
+ if (headerKeySize < 0)
+ throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+
+ String headerKey = Utils.utf8(buffer, headerKeySize);
+ buffer.position(buffer.position() + headerKeySize);
+
+ ByteBuffer headerValue = null;
+ int headerValueSize = ByteUtils.readVarint(buffer);
+ if (headerValueSize >= 0) {
+ headerValue = buffer.slice();
+ headerValue.limit(headerValueSize);
+ buffer.position(buffer.position() + headerValueSize);
+ }
+
+ headers[i] = new Header(headerKey, headerValue);
+ }
+
+ return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
+ }
+
+ private static byte computeAttributes(boolean isControlRecord) {
+ return isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
+ }
+
+ public static int sizeInBytes(int offsetDelta,
+ long timestampDelta,
+ byte[] key,
+ byte[] value) {
+ return sizeInBytes(offsetDelta, timestampDelta, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+ }
+
+ public static int sizeInBytes(int offsetDelta,
+ long timestampDelta,
+ ByteBuffer key,
+ ByteBuffer value,
+ Header[] headers) {
+ int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
+ return bodySize + ByteUtils.sizeOfVarint(bodySize);
+ }
+
+ private static int sizeOfBodyInBytes(int offsetDelta,
+ long timestampDelta,
+ ByteBuffer key,
+ ByteBuffer value,
+ Header[] headers) {
+ int size = 1; // always one byte for attributes
+ size += ByteUtils.sizeOfVarint(offsetDelta);
+ size += ByteUtils.sizeOfVarlong(timestampDelta);
+
+ int keySize = key == null ? -1 : key.remaining();
+ int valueSize = value == null ? -1 : value.remaining();
+ size += sizeOf(keySize, valueSize, headers);
+
+ return size;
+ }
+
+ private static int sizeOf(int keySize, int valueSize, Header[] headers) {
+ int size = 0;
+ if (keySize < 0)
+ size += NULL_VARINT_SIZE_BYTES;
+ else
+ size += ByteUtils.sizeOfVarint(keySize) + keySize;
+
+ if (valueSize < 0)
+ size += NULL_VARINT_SIZE_BYTES;
+ else
+ size += ByteUtils.sizeOfVarint(valueSize) + valueSize;
+
+ if (headers == null)
+ throw new IllegalArgumentException("Headers cannot be null");
+
+ size += ByteUtils.sizeOfVarint(headers.length);
+ for (Header header : headers) {
+ String headerKey = header.key();
+ if (headerKey == null)
+ throw new IllegalArgumentException("Invalid null header key found in headers");
+
+ int headerKeySize = Utils.utf8Length(headerKey);
+ size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;
+
+ ByteBuffer headerValue = header.value();
+ if (headerValue == null) {
+ size += NULL_VARINT_SIZE_BYTES;
+ } else {
+ int headerValueSize = headerValue.remaining();
+ size += ByteUtils.sizeOfVarint(headerValueSize) + headerValueSize;
+ }
+ }
+ return size;
+ }
+
+ static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
+ int keySize = key == null ? -1 : key.length;
+ int valueSize = value == null ? -1 : value.length;
+ return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
new file mode 100644
index 0000000..1de568e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+
+/**
+ * RecordBatch implementation for magic 2 and above. The schema is given below:
+ *
+ * RecordBatch =>
+ * BaseOffset => Int64
+ * Length => Int32
+ * PartitionLeaderEpoch => Int32
+ * Magic => Int8
+ * CRC => Uint32
+ * Attributes => Int16
+ * LastOffsetDelta => Int32
+ * BaseTimestamp => Int64
+ * MaxTimestamp => Int64
+ * ProducerId => Int64
+ * ProducerEpoch => Int16
+ * BaseSequence => Int32
+ * Records => [Record]
+ *
+ * Note that when compression is enabled (see attributes below), the compressed record data is serialized
+ * directly following the count of the number of records.
+ *
+ * The CRC covers the data from the attributes to the end of the batch. Note that the location is
+ * located after the magic byte, which means that consumers must parse the magic byte before
+ * deciding how to interpret the bytes between the batch length and the magic byte. The reason that
+ * the partition leader epoch field is moved ahead of the CRC is to avoid the need to recompute the CRC
+ * for every batch that is received by the broker when this field is assigned.
+ *
+ * The current attributes are given below:
+ *
+ * -----------------------------------------------------------------------------------
+ * | Unused (5-15) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
+ * -----------------------------------------------------------------------------------
+ */
+public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
+ static final int BASE_OFFSET_OFFSET = 0;
+ static final int BASE_OFFSET_LENGTH = 8;
+ static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH;
+ static final int LENGTH_LENGTH = 4;
+ static final int PARTITION_LEADER_EPOCH_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH;
+ static final int PARTITION_LEADER_EPOCH_LENGTH = 4;
+ static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH;
+ static final int MAGIC_LENGTH = 1;
+ static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+ static final int CRC_LENGTH = 4;
+ static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
+ static final int ATTRIBUTE_LENGTH = 2;
+ static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+ static final int LAST_OFFSET_DELTA_LENGTH = 4;
+ static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
+ static final int BASE_TIMESTAMP_LENGTH = 8;
+ static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
+ static final int MAX_TIMESTAMP_LENGTH = 8;
+ static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
+ static final int PRODUCER_ID_LENGTH = 8;
+ static final int PRODUCER_EPOCH_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_LENGTH;
+ static final int PRODUCER_EPOCH_LENGTH = 2;
+ static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH;
+ static final int BASE_SEQUENCE_LENGTH = 4;
+ static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
+ static final int RECORDS_COUNT_LENGTH = 4;
+ static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
+ public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET;
+
+ private static final byte COMPRESSION_CODEC_MASK = 0x07;
+ private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
+ private static final byte TIMESTAMP_TYPE_MASK = 0x08;
+
+ private final ByteBuffer buffer;
+
+ DefaultRecordBatch(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public byte magic() {
+ return buffer.get(MAGIC_OFFSET);
+ }
+
+ @Override
+ public void ensureValid() {
+ if (sizeInBytes() < RECORD_BATCH_OVERHEAD)
+ throw new InvalidRecordException("Record batch is corrupt (the size " + sizeInBytes() +
+ "is smaller than the minimum allowed overhead " + RECORD_BATCH_OVERHEAD + ")");
+
+ if (!isValid())
+ throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+ + ", computed crc = " + computeChecksum() + ")");
+ }
+
+ private long baseTimestamp() {
+ return buffer.getLong(BASE_TIMESTAMP_OFFSET);
+ }
+
+ @Override
+ public long maxTimestamp() {
+ return buffer.getLong(MAX_TIMESTAMP_OFFSET);
+ }
+
+ @Override
+ public TimestampType timestampType() {
+ return (attributes() & TIMESTAMP_TYPE_MASK) == 0 ? TimestampType.CREATE_TIME : TimestampType.LOG_APPEND_TIME;
+ }
+
+ @Override
+ public long baseOffset() {
+ return buffer.getLong(BASE_OFFSET_OFFSET);
+ }
+
+ @Override
+ public long lastOffset() {
+ return baseOffset() + lastOffsetDelta();
+ }
+
+ @Override
+ public long producerId() {
+ return buffer.getLong(PRODUCER_ID_OFFSET);
+ }
+
+ @Override
+ public short producerEpoch() {
+ return buffer.getShort(PRODUCER_EPOCH_OFFSET);
+ }
+
+ @Override
+ public int baseSequence() {
+ return buffer.getInt(BASE_SEQUENCE_OFFSET);
+ }
+
+ private int lastOffsetDelta() {
+ return buffer.getInt(LAST_OFFSET_DELTA_OFFSET);
+ }
+
+ @Override
+ public int lastSequence() {
+ return baseSequence() + lastOffsetDelta();
+ }
+
+ @Override
+ public CompressionType compressionType() {
+ return CompressionType.forId(attributes() & COMPRESSION_CODEC_MASK);
+ }
+
+ @Override
+ public int sizeInBytes() {
+ return LOG_OVERHEAD + buffer.getInt(LENGTH_OFFSET);
+ }
+
+ private int count() {
+ return buffer.getInt(RECORDS_COUNT_OFFSET);
+ }
+
+ @Override
+ public Integer countOrNull() {
+ return count();
+ }
+
+ @Override
+ public void writeTo(ByteBuffer buffer) {
+ buffer.put(this.buffer.duplicate());
+ }
+
+ @Override
+ public boolean isTransactional() {
+ return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
+ }
+
+ @Override
+ public int partitionLeaderEpoch() {
+ return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
+ }
+
+ private Iterator<Record> compressedIterator() {
+ ByteBuffer buffer = this.buffer.duplicate();
+ buffer.position(RECORDS_OFFSET);
+ DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
+ new ByteBufferInputStream(buffer), magic()));
+
+ // TODO: An improvement for the consumer would be to only decompress the records
+ // we need to fill max.poll.records and leave the rest compressed.
+ int numRecords = count();
+ if (numRecords < 0)
+ throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" +
+ magic() + " batch");
+
+ List<Record> records = new ArrayList<>(numRecords);
+ try {
+ Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
+ long baseOffset = baseOffset();
+ long baseTimestamp = baseTimestamp();
+ int baseSequence = baseSequence();
+
+ for (int i = 0; i < numRecords; i++)
+ records.add(DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime));
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ } finally {
+ Utils.closeQuietly(stream, "records iterator stream");
+ }
+
+ return records.iterator();
+ }
+
+ private Iterator<Record> uncompressedIterator() {
+ final ByteBuffer buffer = this.buffer.duplicate();
+ final Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
+ final long baseOffset = baseOffset();
+ final long baseTimestamp = baseTimestamp();
+ final int baseSequence = baseSequence();
+
+ buffer.position(RECORDS_OFFSET);
+ final int totalRecords = count();
+
+ return new Iterator<Record>() {
+ int readRecords = 0;
+
+ @Override
+ public boolean hasNext() {
+ return readRecords < totalRecords;
+ }
+
+ @Override
+ public Record next() {
+ readRecords++;
+ return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public Iterator<Record> iterator() {
+ if (isCompressed())
+ return compressedIterator();
+ else
+ return uncompressedIterator();
+ }
+
+ @Override
+ public void setLastOffset(long offset) {
+ buffer.putLong(BASE_OFFSET_OFFSET, offset - lastOffsetDelta());
+ }
+
+ @Override
+ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
+ long currentMaxTimestamp = maxTimestamp();
+ // We don't need to recompute crc if the timestamp is not updated.
+ if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
+ return;
+
+ byte attributes = computeAttributes(compressionType(), timestampType, isTransactional());
+ buffer.putShort(ATTRIBUTES_OFFSET, attributes);
+ buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
+ long crc = computeChecksum();
+ ByteUtils.writeUnsignedInt(buffer, CRC_OFFSET, crc);
+ }
+
+ @Override
+ public void setPartitionLeaderEpoch(int epoch) {
+ buffer.putInt(PARTITION_LEADER_EPOCH_OFFSET, epoch);
+ }
+
+ @Override
+ public long checksum() {
+ return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
+ }
+
+ public boolean isValid() {
+ return sizeInBytes() >= RECORD_BATCH_OVERHEAD && checksum() == computeChecksum();
+ }
+
+ private long computeChecksum() {
+ return Crc32.crc32(buffer, ATTRIBUTES_OFFSET, buffer.limit() - ATTRIBUTES_OFFSET);
+ }
+
+ private byte attributes() {
+ // note we're not using the second byte of attributes
+ return (byte) buffer.getShort(ATTRIBUTES_OFFSET);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ DefaultRecordBatch that = (DefaultRecordBatch) o;
+ return buffer != null ? buffer.equals(that.buffer) : that.buffer == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return buffer != null ? buffer.hashCode() : 0;
+ }
+
+ private static byte computeAttributes(CompressionType type, TimestampType timestampType, boolean isTransactional) {
+ if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
+ throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
+ "format v2 and above");
+
+ byte attributes = isTransactional ? TRANSACTIONAL_FLAG_MASK : 0;
+ if (type.id > 0)
+ attributes |= COMPRESSION_CODEC_MASK & type.id;
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ attributes |= TIMESTAMP_TYPE_MASK;
+ return attributes;
+ }
+
+ static void writeHeader(ByteBuffer buffer,
+ long baseOffset,
+ int lastOffsetDelta,
+ int sizeInBytes,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseTimestamp,
+ long maxTimestamp,
+ long pid,
+ short epoch,
+ int sequence,
+ boolean isTransactional,
+ int partitionLeaderEpoch,
+ int numRecords) {
+ if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
+ throw new IllegalArgumentException("Invalid magic value " + magic);
+ if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
+ throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp);
+
+ short attributes = computeAttributes(compressionType, timestampType, isTransactional);
+
+ int position = buffer.position();
+ buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
+ buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD);
+ buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
+ buffer.put(position + MAGIC_OFFSET, magic);
+ buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
+ buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp);
+ buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
+ buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
+ buffer.putLong(position + PRODUCER_ID_OFFSET, pid);
+ buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
+ buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
+ buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);
+ long crc = Crc32.crc32(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET);
+ buffer.putInt(position + CRC_OFFSET, (int) crc);
+ }
+
+ @Override
+ public String toString() {
+ return "RecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+ }
+
+ public static int sizeInBytes(long baseOffset, Iterable<Record> records) {
+ Iterator<Record> iterator = records.iterator();
+ if (!iterator.hasNext())
+ return 0;
+
+ int size = RECORD_BATCH_OVERHEAD;
+ Long baseTimestamp = null;
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ int offsetDelta = (int) (record.offset() - baseOffset);
+ if (baseTimestamp == null)
+ baseTimestamp = record.timestamp();
+ long timestampDelta = record.timestamp() - baseTimestamp;
+ size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
+ record.headers());
+ }
+ return size;
+ }
+
+ public static int sizeInBytes(Iterable<SimpleRecord> records) {
+ Iterator<SimpleRecord> iterator = records.iterator();
+ if (!iterator.hasNext())
+ return 0;
+
+ int size = RECORD_BATCH_OVERHEAD;
+ int offsetDelta = 0;
+ Long baseTimestamp = null;
+ while (iterator.hasNext()) {
+ SimpleRecord record = iterator.next();
+ if (baseTimestamp == null)
+ baseTimestamp = record.timestamp();
+ long timestampDelta = record.timestamp() - baseTimestamp;
+ size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, record.key(), record.value(),
+ record.headers());
+ }
+ return size;
+ }
+
+ /**
+ * Get an upper bound on the size of a batch with only a single record using a given key and value.
+ */
+ static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
+ return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 1461b55..59055ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -23,16 +23,19 @@ import org.apache.kafka.common.utils.Utils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Iterator;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
/**
* A log input stream which is backed by a {@link FileChannel}.
*/
-public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelLogEntry> {
+public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {
private int position;
private final int end;
private final FileChannel channel;
private final int maxRecordSize;
- private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+ private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD);
/**
* Create a new log input stream over the FileChannel
@@ -41,10 +44,10 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
* @param start Position in the file channel to start from
* @param end Position in the file channel not to read past
*/
- public FileLogInputStream(FileChannel channel,
- int maxRecordSize,
- int start,
- int end) {
+ FileLogInputStream(FileChannel channel,
+ int maxRecordSize,
+ int start,
+ int end) {
this.channel = channel;
this.maxRecordSize = maxRecordSize;
this.position = start;
@@ -52,8 +55,8 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
}
@Override
- public FileChannelLogEntry nextEntry() throws IOException {
- if (position + Records.LOG_OVERHEAD >= end)
+ public FileChannelRecordBatch nextBatch() throws IOException {
+ if (position + LOG_OVERHEAD >= end)
return null;
logHeaderBuffer.rewind();
@@ -63,45 +66,91 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
long offset = logHeaderBuffer.getLong();
int size = logHeaderBuffer.getInt();
- if (size < Record.RECORD_OVERHEAD_V0)
- throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", Record.RECORD_OVERHEAD_V0));
+ // V0 has the smallest overhead, stricter checking is done later
+ if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
if (size > maxRecordSize)
throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize));
- if (position + Records.LOG_OVERHEAD + size > end)
+ if (position + LOG_OVERHEAD + size > end)
return null;
- FileChannelLogEntry logEntry = new FileChannelLogEntry(offset, channel, position, size);
- position += logEntry.sizeInBytes();
- return logEntry;
+ FileChannelRecordBatch batch = new FileChannelRecordBatch(offset, channel, position, size);
+ position += batch.sizeInBytes();
+ return batch;
}
/**
- * Log entry backed by an underlying FileChannel. This allows iteration over the shallow log
- * entries without needing to read the record data into memory until it is needed. The downside
+ * Log entry backed by an underlying FileChannel. This allows iteration over the record batches
+ * without needing to read the record data into memory until it is needed. The downside
* is that entries will generally no longer be readable when the underlying channel is closed.
*/
- public static class FileChannelLogEntry extends LogEntry {
+ public static class FileChannelRecordBatch extends AbstractRecordBatch {
private final long offset;
private final FileChannel channel;
private final int position;
- private final int recordSize;
- private Record record = null;
+ private final int batchSize;
+ private RecordBatch underlying;
+ private Byte magic;
- private FileChannelLogEntry(long offset,
- FileChannel channel,
- int position,
- int recordSize) {
+ private FileChannelRecordBatch(long offset,
+ FileChannel channel,
+ int position,
+ int batchSize) {
this.offset = offset;
this.channel = channel;
this.position = position;
- this.recordSize = recordSize;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public long baseOffset() {
+ if (magic() >= RecordBatch.MAGIC_VALUE_V2)
+ return offset;
+
+ loadUnderlyingRecordBatch();
+ return underlying.baseOffset();
}
@Override
- public long offset() {
- return offset;
+ public CompressionType compressionType() {
+ loadUnderlyingRecordBatch();
+ return underlying.compressionType();
+ }
+
+ @Override
+ public TimestampType timestampType() {
+ loadUnderlyingRecordBatch();
+ return underlying.timestampType();
+ }
+
+ @Override
+ public long maxTimestamp() {
+ loadUnderlyingRecordBatch();
+ return underlying.maxTimestamp();
+ }
+
+ @Override
+ public long lastOffset() {
+ if (magic() < RecordBatch.MAGIC_VALUE_V2)
+ return offset;
+ else if (underlying != null)
+ return underlying.lastOffset();
+
+ try {
+ // TODO: this logic probably should be moved into DefaultRecordBatch somehow
+ // maybe we just need two separate implementations
+
+ byte[] offsetDelta = new byte[4];
+ ByteBuffer buf = ByteBuffer.wrap(offsetDelta);
+ channel.read(buf, position + DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET);
+ if (buf.hasRemaining())
+ throw new KafkaException("Failed to read magic byte from FileChannel " + channel);
+ return offset + buf.getInt(0);
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
}
public int position() {
@@ -110,52 +159,151 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
@Override
public byte magic() {
- if (record != null)
- return record.magic();
+ if (magic != null)
+ return magic;
+ if (underlying != null)
+ return underlying.magic();
try {
- byte[] magic = new byte[1];
- ByteBuffer buf = ByteBuffer.wrap(magic);
- Utils.readFullyOrFail(channel, buf, position + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET, "magic byte");
- return magic[0];
+ ByteBuffer buf = ByteBuffer.wrap(new byte[1]);
+ Utils.readFullyOrFail(channel, buf, position + Records.MAGIC_OFFSET, "magic byte");
+ magic = buf.get(0);
+ return magic;
} catch (IOException e) {
throw new KafkaException(e);
}
}
- /**
- * Force load the record and its data (key and value) into memory.
- * @return The resulting record
- * @throws IOException for any IO errors reading from the underlying file
- */
- private Record loadRecord() throws IOException {
- if (record != null)
- return record;
+ @Override
+ public long producerId() {
+ loadUnderlyingRecordBatch();
+ return underlying.producerId();
+ }
- ByteBuffer recordBuffer = ByteBuffer.allocate(recordSize);
- Utils.readFullyOrFail(channel, recordBuffer, position + Records.LOG_OVERHEAD, "full record");
+ @Override
+ public short producerEpoch() {
+ loadUnderlyingRecordBatch();
+ return underlying.producerEpoch();
+ }
- recordBuffer.rewind();
- record = new Record(recordBuffer);
- return record;
+ @Override
+ public int baseSequence() {
+ loadUnderlyingRecordBatch();
+ return underlying.baseSequence();
}
@Override
- public Record record() {
- if (record != null)
- return record;
+ public int lastSequence() {
+ loadUnderlyingRecordBatch();
+ return underlying.lastSequence();
+ }
+ private void loadUnderlyingRecordBatch() {
try {
- return loadRecord();
+ if (underlying != null)
+ return;
+
+ ByteBuffer batchBuffer = ByteBuffer.allocate(sizeInBytes());
+ Utils.readFullyOrFail(channel, batchBuffer, position, "full record batch");
+ batchBuffer.rewind();
+
+ byte magic = batchBuffer.get(Records.MAGIC_OFFSET);
+ if (magic > RecordBatch.MAGIC_VALUE_V1)
+ underlying = new DefaultRecordBatch(batchBuffer);
+ else
+ underlying = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchBuffer);
} catch (IOException e) {
- throw new KafkaException(e);
+ throw new KafkaException("Failed to load record batch at position " + position + " from file channel " + channel);
}
}
@Override
+ public Iterator<Record> iterator() {
+ loadUnderlyingRecordBatch();
+ return underlying.iterator();
+ }
+
+ @Override
+ public boolean isValid() {
+ loadUnderlyingRecordBatch();
+ return underlying.isValid();
+ }
+
+ @Override
+ public void ensureValid() {
+ loadUnderlyingRecordBatch();
+ underlying.ensureValid();
+ }
+
+ @Override
+ public long checksum() {
+ loadUnderlyingRecordBatch();
+ return underlying.checksum();
+ }
+
+ @Override
public int sizeInBytes() {
- return Records.LOG_OVERHEAD + recordSize;
+ return LOG_OVERHEAD + batchSize;
+ }
+
+ @Override
+ public Integer countOrNull() {
+ loadUnderlyingRecordBatch();
+ return underlying.countOrNull();
+ }
+
+ @Override
+ public void writeTo(ByteBuffer buffer) {
+ try {
+ int limit = buffer.limit();
+ buffer.limit(buffer.position() + sizeInBytes());
+ Utils.readFully(channel, buffer, position);
+ buffer.limit(limit);
+ } catch (IOException e) {
+ throw new KafkaException("Failed to read record batch at position " + position + " from file channel " +
+ channel, e);
+ }
}
+ @Override
+ public boolean isTransactional() {
+ loadUnderlyingRecordBatch();
+ return underlying.isTransactional();
+ }
+
+ @Override
+ public int partitionLeaderEpoch() {
+ loadUnderlyingRecordBatch();
+ return underlying.partitionLeaderEpoch();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ FileChannelRecordBatch that = (FileChannelRecordBatch) o;
+
+ return offset == that.offset &&
+ position == that.position &&
+ batchSize == that.batchSize &&
+ (channel == null ? that.channel == null : channel.equals(that.channel));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (offset ^ (offset >>> 32));
+ result = 31 * result + (channel != null ? channel.hashCode() : 0);
+ result = 31 * result + position;
+ result = 31 * result + batchSize;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "FileChannelRecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+ }
}
}
|