kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [10/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:44:03 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
new file mode 100644
index 0000000..6deeb52
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+
+/**
+ * This {@link RecordBatch} implementation is for magic versions 0 and 1. In addition to implementing
+ * {@link RecordBatch}, it also implements {@link Record}, which exposes the duality of the old message
+ * format in its handling of compressed messages. The wrapper record is considered the record batch in this
+ * interface, while the inner records are considered the log records (though they both share the same schema).
+ *
+ * In general, this class should not be used directly. Instances of {@link Records} provides access to this
+ * class indirectly through the {@link RecordBatch} interface.
+ */
+public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch implements Record {
+
+    public abstract LegacyRecord outerRecord();
+
+    @Override
+    public long lastOffset() {
+        return offset();
+    }
+
+    @Override
+    public boolean isValid() {
+        return outerRecord().isValid();
+    }
+
+    @Override
+    public void ensureValid() {
+        outerRecord().ensureValid();
+    }
+
+    @Override
+    public int keySize() {
+        return outerRecord().keySize();
+    }
+
+    @Override
+    public boolean hasKey() {
+        return outerRecord().hasKey();
+    }
+
+    @Override
+    public ByteBuffer key() {
+        return outerRecord().key();
+    }
+
+    @Override
+    public int valueSize() {
+        return outerRecord().valueSize();
+    }
+
+    @Override
+    public boolean hasValue() {
+        return !outerRecord().hasNullValue();
+    }
+
+    @Override
+    public ByteBuffer value() {
+        return outerRecord().value();
+    }
+
+    @Override
+    public Header[] headers() {
+        return Record.EMPTY_HEADERS;
+    }
+
+    @Override
+    public boolean hasMagic(byte magic) {
+        return magic == outerRecord().magic();
+    }
+
+    @Override
+    public boolean hasTimestampType(TimestampType timestampType) {
+        return outerRecord().timestampType() == timestampType;
+    }
+
+    @Override
+    public long checksum() {
+        return outerRecord().checksum();
+    }
+
+    @Override
+    public long maxTimestamp() {
+        return timestamp();
+    }
+
+    @Override
+    public long timestamp() {
+        return outerRecord().timestamp();
+    }
+
+    @Override
+    public TimestampType timestampType() {
+        return outerRecord().timestampType();
+    }
+
+    @Override
+    public long baseOffset() {
+        return iterator().next().offset();
+    }
+
+    @Override
+    public byte magic() {
+        return outerRecord().magic();
+    }
+
+    @Override
+    public CompressionType compressionType() {
+        return outerRecord().compressionType();
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return outerRecord().sizeInBytes() + LOG_OVERHEAD;
+    }
+
+    @Override
+    public Integer countOrNull() {
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return "LegacyRecordBatch(" + offset() + ", " + outerRecord() + ")";
+    }
+
+    @Override
+    public void writeTo(ByteBuffer buffer) {
+        writeHeader(buffer, offset(), outerRecord().sizeInBytes());
+        buffer.put(outerRecord().buffer().duplicate());
+    }
+
+    @Override
+    public long producerId() {
+        return RecordBatch.NO_PRODUCER_ID;
+    }
+
+    @Override
+    public short producerEpoch() {
+        return RecordBatch.NO_PRODUCER_EPOCH;
+    }
+
+    @Override
+    public long sequence() {
+        return RecordBatch.NO_SEQUENCE;
+    }
+
+    @Override
+    public int baseSequence() {
+        return RecordBatch.NO_SEQUENCE;
+    }
+
+    @Override
+    public int lastSequence() {
+        return RecordBatch.NO_SEQUENCE;
+    }
+
+    @Override
+    public boolean isTransactional() {
+        return false;
+    }
+
+    @Override
+    public int partitionLeaderEpoch() {
+        return RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+    }
+
+    @Override
+    public boolean isControlRecord() {
+        return false;
+    }
+
+    /**
+     * Get an iterator for the nested entries contained within this batch. Note that
+     * if the batch is not compressed, then this method will return an iterator over the
+     * shallow record only (i.e. this object).
+     * @return An iterator over the records contained within this batch
+     */
+    @Override
+    public Iterator<Record> iterator() {
+        if (isCompressed())
+            return new DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+        else
+            return Collections.<Record>singletonList(this).iterator();
+    }
+
+    static void writeHeader(ByteBuffer buffer, long offset, int size) {
+        buffer.putLong(offset);
+        buffer.putInt(size);
+    }
+
+    static void writeHeader(DataOutputStream out, long offset, int size) throws IOException {
+        out.writeLong(offset);
+        out.writeInt(size);
+    }
+
+    private static final class DataLogInputStream implements LogInputStream<AbstractLegacyRecordBatch> {
+        private final DataInputStream stream;
+        protected final int maxMessageSize;
+
+        DataLogInputStream(DataInputStream stream, int maxMessageSize) {
+            this.stream = stream;
+            this.maxMessageSize = maxMessageSize;
+        }
+
+        public AbstractLegacyRecordBatch nextBatch() throws IOException {
+            try {
+                long offset = stream.readLong();
+                int size = stream.readInt();
+                if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+                    throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
+                if (size > maxMessageSize)
+                    throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+                byte[] recordBuffer = new byte[size];
+                stream.readFully(recordBuffer, 0, size);
+                ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
+                return new BasicLegacyRecordBatch(offset, new LegacyRecord(buf));
+            } catch (EOFException e) {
+                return null;
+            }
+        }
+    }
+
+    private static class DeepRecordsIterator extends AbstractIterator<Record> {
+        private final ArrayDeque<AbstractLegacyRecordBatch> batches;
+        private final long absoluteBaseOffset;
+        private final byte wrapperMagic;
+
+        private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+            LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
+            this.wrapperMagic = wrapperRecord.magic();
+
+            CompressionType compressionType = wrapperRecord.compressionType();
+            ByteBuffer wrapperValue = wrapperRecord.value();
+            if (wrapperValue == null)
+                throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
+                        wrapperMagic + ")");
+
+            DataInputStream stream = new DataInputStream(compressionType.wrapForInput(
+                    new ByteBufferInputStream(wrapperValue), wrapperRecord.magic()));
+            LogInputStream<AbstractLegacyRecordBatch> logStream = new DataLogInputStream(stream, maxMessageSize);
+
+            long wrapperRecordOffset = wrapperEntry.lastOffset();
+            long wrapperRecordTimestamp = wrapperRecord.timestamp();
+            this.batches = new ArrayDeque<>();
+
+            // If relative offset is used, we need to decompress the entire message first to compute
+            // the absolute offset. For simplicity and because it's a format that is on its way out, we
+            // do the same for message format version 0
+            try {
+                while (true) {
+                    AbstractLegacyRecordBatch batch = logStream.nextBatch();
+                    if (batch == null)
+                        break;
+
+                    LegacyRecord record = batch.outerRecord();
+                    byte magic = record.magic();
+
+                    if (ensureMatchingMagic && magic != wrapperMagic)
+                        throw new InvalidRecordException("Compressed message magic " + magic +
+                                " does not match wrapper magic " + wrapperMagic);
+
+                    if (magic > RecordBatch.MAGIC_VALUE_V0) {
+                        LegacyRecord recordWithTimestamp = new LegacyRecord(
+                                record.buffer(),
+                                wrapperRecordTimestamp,
+                                wrapperRecord.timestampType());
+                        batch = new BasicLegacyRecordBatch(batch.lastOffset(), recordWithTimestamp);
+                    }
+                    batches.addLast(batch);
+
+                    // break early if we reach the last offset in the batch
+                    if (batch.offset() == wrapperRecordOffset)
+                        break;
+                }
+
+                if (batches.isEmpty())
+                    throw new InvalidRecordException("Found invalid compressed record set with no inner records");
+
+                if (wrapperMagic > RecordBatch.MAGIC_VALUE_V0)
+                    this.absoluteBaseOffset = wrapperRecordOffset - batches.getLast().lastOffset();
+                else
+                    this.absoluteBaseOffset = -1;
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            } finally {
+                Utils.closeQuietly(stream, "records iterator stream");
+            }
+        }
+
+        @Override
+        protected Record makeNext() {
+            if (batches.isEmpty())
+                return allDone();
+
+            AbstractLegacyRecordBatch entry = batches.remove();
+
+            // Convert offset to absolute offset if needed.
+            if (absoluteBaseOffset >= 0) {
+                long absoluteOffset = absoluteBaseOffset + entry.lastOffset();
+                entry = new BasicLegacyRecordBatch(absoluteOffset, entry.outerRecord());
+            }
+
+            if (entry.isCompressed())
+                throw new InvalidRecordException("Inner messages must not be compressed");
+
+            return entry;
+        }
+    }
+
+    private static class BasicLegacyRecordBatch extends AbstractLegacyRecordBatch {
+        private final LegacyRecord record;
+        private final long offset;
+
+        private BasicLegacyRecordBatch(long offset, LegacyRecord record) {
+            this.offset = offset;
+            this.record = record;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+
+        @Override
+        public LegacyRecord outerRecord() {
+            return record;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            BasicLegacyRecordBatch that = (BasicLegacyRecordBatch) o;
+
+            return offset == that.offset &&
+                    (record != null ? record.equals(that.record) : that.record == null);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = record != null ? record.hashCode() : 0;
+            result = 31 * result + (int) (offset ^ (offset >>> 32));
+            return result;
+        }
+    }
+
+    static class ByteBufferLegacyRecordBatch extends AbstractLegacyRecordBatch implements MutableRecordBatch {
+        private final ByteBuffer buffer;
+        private final LegacyRecord record;
+
+        ByteBufferLegacyRecordBatch(ByteBuffer buffer) {
+            this.buffer = buffer;
+            buffer.position(LOG_OVERHEAD);
+            this.record = new LegacyRecord(buffer.slice());
+            buffer.position(OFFSET_OFFSET);
+        }
+
+        @Override
+        public long offset() {
+            return buffer.getLong(OFFSET_OFFSET);
+        }
+
+        @Override
+        public LegacyRecord outerRecord() {
+            return record;
+        }
+
+        @Override
+        public void setLastOffset(long offset) {
+            buffer.putLong(OFFSET_OFFSET, offset);
+        }
+
+        @Override
+        public void setMaxTimestamp(TimestampType timestampType, long timestamp) {
+            if (record.magic() == RecordBatch.MAGIC_VALUE_V0)
+                throw new UnsupportedOperationException("Cannot set timestamp for a record with magic = 0");
+
+            long currentTimestamp = record.timestamp();
+            // We don't need to recompute crc if the timestamp is not updated.
+            if (record.timestampType() == timestampType && currentTimestamp == timestamp)
+                return;
+
+            setTimestampAndUpdateCrc(timestampType, timestamp);
+        }
+
+        @Override
+        public void setPartitionLeaderEpoch(int epoch) {
+            throw new UnsupportedOperationException("Magic versions prior to 2 do not support partition leader epoch");
+        }
+
+        private void setTimestampAndUpdateCrc(TimestampType timestampType, long timestamp) {
+            byte attributes = LegacyRecord.computeAttributes(magic(), compressionType(), timestampType);
+            buffer.put(LOG_OVERHEAD + LegacyRecord.ATTRIBUTES_OFFSET, attributes);
+            buffer.putLong(LOG_OVERHEAD + LegacyRecord.TIMESTAMP_OFFSET, timestamp);
+            long crc = record.computeChecksum();
+            ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + LegacyRecord.CRC_OFFSET, crc);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ByteBufferLegacyRecordBatch that = (ByteBufferLegacyRecordBatch) o;
+
+            return buffer != null ? buffer.equals(that.buffer) : that.buffer == null;
+        }
+
+        @Override
+        public int hashCode() {
+            return buffer != null ? buffer.hashCode() : 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
new file mode 100644
index 0000000..53245e7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+abstract class AbstractRecordBatch implements RecordBatch {
+
+    @Override
+    public long nextOffset() {
+        return lastOffset() + 1;
+    }
+
+    @Override
+    public boolean isCompressed() {
+        return compressionType() != CompressionType.NONE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 1548a95..decc06c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -25,75 +29,158 @@ public abstract class AbstractRecords implements Records {
     private final Iterable<Record> records = new Iterable<Record>() {
         @Override
         public Iterator<Record> iterator() {
-            return new Iterator<Record>() {
-                private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator();
-                @Override
-                public boolean hasNext() {
-                    return deepEntries.hasNext();
-                }
-                @Override
-                public Record next() {
-                    return deepEntries.next().record();
-                }
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException("Removal not supported");
-                }
-            };
+            return recordsIterator();
         }
     };
 
     @Override
-    public boolean hasMatchingShallowMagic(byte magic) {
-        for (LogEntry entry : shallowEntries())
-            if (entry.magic() != magic)
+    public boolean hasMatchingMagic(byte magic) {
+        for (RecordBatch batch : batches())
+            if (batch.magic() != magic)
                 return false;
         return true;
     }
 
-    /**
-     * Convert this message set to use the specified message format.
-     */
     @Override
-    public Records toMessageFormat(byte toMagic, TimestampType upconvertTimestampType) {
-        List<LogEntry> converted = new ArrayList<>();
-        for (LogEntry entry : deepEntries())
-            converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic, upconvertTimestampType)));
-
-        if (converted.isEmpty()) {
-            // This indicates that the message is too large, which indicates that the buffer is not large
-            // enough to hold a full log entry. We just return all the bytes in the file message set.
-            // Even though the message set does not have the right format version, we expect old clients
-            // to raise an error to the user after reading the message size and seeing that there
-            // are not enough available bytes in the response to read the full message.
-            return this;
-        } else {
-            // We use the first message to determine the compression type for the resulting message set.
-            // This could result in message sets which are either larger or smaller than the original size.
-            // For example, it could end up larger if most messages were previously compressed, but
-            // it just so happens that the first one is not. There is also some risk that this can
-            // cause some timestamp information to be lost (e.g. if the timestamp type was changed) since
-            // we are essentially merging multiple message sets. However, currently this method is only
-            // used for down-conversion, so we've ignored the problem.
-            CompressionType compressionType = shallowEntries().iterator().next().record().compressionType();
-            return MemoryRecords.withLogEntries(compressionType, converted);
+    public boolean hasCompatibleMagic(byte magic) {
+        for (RecordBatch batch : batches())
+            if (batch.magic() > magic)
+                return false;
+        return true;
+    }
+
+    protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte toMagic) {
+        // maintain the batch along with the decompressed records to avoid the need to decompress again
+        List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
+        int totalSizeEstimate = 0;
+
+        for (RecordBatch batch : batches) {
+            if (batch.magic() <= toMagic) {
+                totalSizeEstimate += batch.sizeInBytes();
+                recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
+            } else {
+                List<Record> records = Utils.toList(batch.iterator());
+                final long baseOffset;
+                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2)
+                    baseOffset = batch.baseOffset();
+                else
+                    baseOffset = records.get(0).offset();
+                totalSizeEstimate += estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
+                recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
+            }
         }
+
+        ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
+        for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
+            if (recordBatchAndRecords.batch.magic() <= toMagic)
+                recordBatchAndRecords.batch.writeTo(buffer);
+            else
+                buffer = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
+        }
+
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
     }
 
-    public static int estimatedSize(CompressionType compressionType, Iterable<LogEntry> entries) {
-        int size = 0;
-        for (LogEntry entry : entries)
-            size += entry.sizeInBytes();
-        // NOTE: 1024 is the minimum block size for snappy encoding
-        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
+    /**
+     * Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
+     * one (e.g. it may require expansion).
+     */
+    private ByteBuffer convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
+        RecordBatch batch = recordBatchAndRecords.batch;
+        final TimestampType timestampType = batch.timestampType();
+        long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
+                timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
+        for (Record record : recordBatchAndRecords.records) {
+            // control messages are only supported in v2 and above, so skip when down-converting
+            if (magic < RecordBatch.MAGIC_VALUE_V2 && record.isControlRecord())
+                continue;
+            builder.append(record);
+        }
+
+        builder.close();
+        return builder.buffer();
     }
 
     /**
      * Get an iterator over the deep records.
      * @return An iterator over the records
      */
+    @Override
     public Iterable<Record> records() {
         return records;
     }
 
+    private Iterator<Record> recordsIterator() {
+        return new AbstractIterator<Record>() {
+            private final Iterator<? extends RecordBatch> batches = batches().iterator();
+            private Iterator<Record> records;
+
+            @Override
+            protected Record makeNext() {
+                if (records != null && records.hasNext())
+                    return records.next();
+
+                if (batches.hasNext()) {
+                    records = batches.next().iterator();
+                    return makeNext();
+                }
+
+                return allDone();
+            }
+        };
+    }
+
+    public static int estimateSizeInBytes(byte magic,
+                                          long baseOffset,
+                                          CompressionType compressionType,
+                                          Iterable<Record> records) {
+        int size = 0;
+        if (magic <= RecordBatch.MAGIC_VALUE_V1) {
+            for (Record record : records)
+                size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
+        } else {
+            size = DefaultRecordBatch.sizeInBytes(baseOffset, records);
+        }
+        return estimateCompressedSizeInBytes(size, compressionType);
+    }
+
+    public static int estimateSizeInBytes(byte magic,
+                                          CompressionType compressionType,
+                                          Iterable<SimpleRecord> records) {
+        int size = 0;
+        if (magic <= RecordBatch.MAGIC_VALUE_V1) {
+            for (SimpleRecord record : records)
+                size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
+        } else {
+            size = DefaultRecordBatch.sizeInBytes(records);
+        }
+        return estimateCompressedSizeInBytes(size, compressionType);
+    }
+
+    private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
+        return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
+    }
+
+    public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value) {
+        if (magic >= RecordBatch.MAGIC_VALUE_V2)
+            return DefaultRecordBatch.batchSizeUpperBound(key, value, Record.EMPTY_HEADERS);
+        else
+            return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
+    }
+
+    private static class RecordBatchAndRecords {
+        private final RecordBatch batch;
+        private final List<Record> records;
+        private final Long baseOffset;
+
+        private RecordBatchAndRecords(RecordBatch batch, List<Record> records, Long baseOffset) {
+            this.batch = batch;
+            this.records = records;
+            this.baseOffset = baseOffset;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index f4a3da4..bd54e40 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -17,19 +17,19 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.utils.ByteUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
-import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
 
 /**
  * A byte buffer backed log input stream. This class avoids the need to copy records by returning
  * slices from the underlying byte buffer.
  */
-class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStream.ByteBufferLogEntry> {
+class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
     private final ByteBuffer buffer;
     private final int maxMessageSize;
 
@@ -38,80 +38,35 @@ class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStrea
         this.maxMessageSize = maxMessageSize;
     }
 
-    public ByteBufferLogEntry nextEntry() throws IOException {
+    public MutableRecordBatch nextBatch() throws IOException {
         int remaining = buffer.remaining();
         if (remaining < LOG_OVERHEAD)
             return null;
 
-        int recordSize = buffer.getInt(buffer.position() + Records.SIZE_OFFSET);
-        if (recordSize < Record.RECORD_OVERHEAD_V0)
-            throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+        int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
+        // V0 has the smallest overhead, stricter checking is done later
+        if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
+            throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
         if (recordSize > maxMessageSize)
             throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
 
-        int entrySize = recordSize + LOG_OVERHEAD;
-        if (remaining < entrySize)
+        int batchSize = recordSize + LOG_OVERHEAD;
+        if (remaining < batchSize)
             return null;
 
-        ByteBuffer entrySlice = buffer.slice();
-        entrySlice.limit(entrySize);
-        buffer.position(buffer.position() + entrySize);
-        return new ByteBufferLogEntry(entrySlice);
-    }
-
-    public static class ByteBufferLogEntry extends LogEntry {
-        private final ByteBuffer buffer;
-        private final Record record;
-
-        private ByteBufferLogEntry(ByteBuffer buffer) {
-            this.buffer = buffer;
-            buffer.position(LOG_OVERHEAD);
-            this.record = new Record(buffer.slice());
-            buffer.position(OFFSET_OFFSET);
-        }
-
-        @Override
-        public long offset() {
-            return buffer.getLong(OFFSET_OFFSET);
-        }
-
-        @Override
-        public Record record() {
-            return record;
-        }
-
-        public void setOffset(long offset) {
-            buffer.putLong(OFFSET_OFFSET, offset);
-        }
-
-        public void setCreateTime(long timestamp) {
-            if (record.magic() == Record.MAGIC_VALUE_V0)
-                throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
-
-            long currentTimestamp = record.timestamp();
-            // We don't need to recompute crc if the timestamp is not updated.
-            if (record.timestampType() == TimestampType.CREATE_TIME && currentTimestamp == timestamp)
-                return;
-            setTimestampAndUpdateCrc(TimestampType.CREATE_TIME, timestamp);
-        }
+        byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
 
-        public void setLogAppendTime(long timestamp) {
-            if (record.magic() == Record.MAGIC_VALUE_V0)
-                throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
-            setTimestampAndUpdateCrc(TimestampType.LOG_APPEND_TIME, timestamp);
-        }
+        ByteBuffer batchSlice = buffer.slice();
+        batchSlice.limit(batchSize);
+        buffer.position(buffer.position() + batchSize);
 
-        private void setTimestampAndUpdateCrc(TimestampType timestampType, long timestamp) {
-            byte attributes = record.attributes();
-            buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, timestampType.updateAttributes(attributes));
-            buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
-            long crc = record.computeChecksum();
-            ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
-        }
+        if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
+            throw new CorruptRecordException("Invalid magic found in record: " + magic);
 
-        public ByteBuffer buffer() {
-            return buffer;
-        }
+        if (magic > RecordBatch.MAGIC_VALUE_V1)
+            return new DefaultRecordBatch(batchSlice);
+        else
+            return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index d88c530..093d5b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -87,7 +87,7 @@ public enum CompressionType {
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
             try {
                 return (OutputStream) LZ4_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer,
-                        messageVersion == Record.MAGIC_VALUE_V0);
+                        messageVersion == RecordBatch.MAGIC_VALUE_V0);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }
@@ -97,7 +97,7 @@ public enum CompressionType {
         public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
             try {
                 return (InputStream) LZ4_INPUT_STREAM_SUPPLIER.get().newInstance(buffer,
-                        messageVersion == Record.MAGIC_VALUE_V0);
+                        messageVersion == RecordBatch.MAGIC_VALUE_V0);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
new file mode 100644
index 0000000..6bd614a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Control records specify a schema for the record key which includes a version and type:
+ *
+ * Key => Version Type
+ *   Version => Int16
+ *   Type => Int16
+ *
+ * In the future, the version can be bumped to indicate a new schema, but it must be backwards compatible
+ * with the current schema. In general, this means we can add new fields, but we cannot remove old ones.
+ *
+ * Note that control records are not considered for compaction by the log cleaner.
+ *
+ * The schema for the value field is left to the control record type to specify.
+ */
+public enum ControlRecordType {
+    COMMIT((short) 0),
+    ABORT((short) 1),
+
+    // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+    UNKNOWN((short) -1);
+
+    private static final Logger log = LoggerFactory.getLogger(ControlRecordType.class);
+
+    static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
+    private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new Schema(
+            new Field("version", Type.INT16),
+            new Field("type", Type.INT16));
+
+    final short type;
+
+    ControlRecordType(short type) {
+        this.type = type;
+    }
+
+    public Struct recordKey() {
+        if (this == UNKNOWN)
+            throw new IllegalArgumentException("Cannot serialize UNKNOWN control record type");
+
+        Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
+        struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
+        struct.set("type", type);
+        return struct;
+    }
+
+    public static ControlRecordType parse(ByteBuffer key) {
+        short version = key.getShort(0);
+        if (version != CURRENT_CONTROL_RECORD_KEY_VERSION)
+            log.debug("Received unknown control record key version {}. Parsing as version {}" +
+                    version, CURRENT_CONTROL_RECORD_KEY_VERSION);
+        short type = key.getShort(2);
+        switch (type) {
+            case 0:
+                return COMMIT;
+            case 1:
+                return ABORT;
+            default:
+                return UNKNOWN;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
new file mode 100644
index 0000000..7c9b6f1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
+import static org.apache.kafka.common.utils.Utils.wrapNullable;
+
+/**
+ * This class implements the inner record format for magic 2 and above. The schema is as follows:
+ *
+ *
+ * Record =>
+ *   Length => Varint
+ *   Attributes => Int8
+ *   TimestampDelta => Varlong
+ *   OffsetDelta => Varint
+ *   Key => Bytes
+ *   Value => Bytes
+ *   Headers => [HeaderKey HeaderValue]
+ *     HeaderKey => String
+ *     HeaderValue => Bytes
+ *
+ * Note that in this schema, the Bytes and String types use a variable length integer to represent
+ * the length of the field. The array type used for the headers also uses a Varint for the number of
+ * headers.
+ *
+ * The current record attributes are depicted below:
+ *
+ *  -----------------------------------
+ *  | Unused (1-7) | Control Flag (0) |
+ *  -----------------------------------
+ *
+ * The control flag is used to implement control records (see {@link ControlRecordType}).
+ *
+ * The offset and timestamp deltas compute the difference relative to the base offset and
+ * base timestamp of the log entry that this record is contained in.
+ */
+public class DefaultRecord implements Record {
+
+    // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
+    public static final int MAX_RECORD_OVERHEAD = 21;
+
+    private static final int CONTROL_FLAG_MASK = 0x01;
+    private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1);
+
+    private final int sizeInBytes;
+    private final byte attributes;
+    private final long offset;
+    private final long timestamp;
+    private final int sequence;
+    private final ByteBuffer key;
+    private final ByteBuffer value;
+    private final Header[] headers;
+    private Long checksum = null;
+
+    private DefaultRecord(int sizeInBytes,
+                          byte attributes,
+                          long offset,
+                          long timestamp,
+                          int sequence,
+                          ByteBuffer key,
+                          ByteBuffer value,
+                          Header[] headers) {
+        this.sizeInBytes = sizeInBytes;
+        this.attributes = attributes;
+        this.offset = offset;
+        this.timestamp = timestamp;
+        this.sequence = sequence;
+        this.key = key;
+        this.value = value;
+        this.headers = headers;
+    }
+
+    @Override
+    public long offset() {
+        return offset;
+    }
+
+    @Override
+    public long sequence() {
+        return sequence;
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return sizeInBytes;
+    }
+
+    @Override
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public byte attributes() {
+        return attributes;
+    }
+
+    @Override
+    public long checksum() {
+        if (checksum == null)
+            checksum = computeChecksum(timestamp, key, value);
+        return checksum;
+    }
+
+    @Override
+    public boolean isValid() {
+        // new versions of the message format (2 and above) do not contain an individual record checksum;
+        // instead they are validated with the checksum at the log entry level
+        return true;
+    }
+
+    @Override
+    public void ensureValid() {}
+
+    @Override
+    public int keySize() {
+        return key == null ? -1 : key.remaining();
+    }
+
+    @Override
+    public int valueSize() {
+        return value == null ? -1 : value.remaining();
+    }
+
+    @Override
+    public boolean hasKey() {
+        return key != null;
+    }
+
+    @Override
+    public ByteBuffer key() {
+        return key == null ? null : key.duplicate();
+    }
+
+    @Override
+    public boolean hasValue() {
+        return value != null;
+    }
+
+    @Override
+    public ByteBuffer value() {
+        return value == null ? null : value.duplicate();
+    }
+
+    @Override
+    public Header[] headers() {
+        return headers;
+    }
+
+    /**
+     * Write the record to `out` and return its crc.
+     */
+    public static long writeTo(DataOutputStream out,
+                               boolean isControlRecord,
+                               int offsetDelta,
+                               long timestampDelta,
+                               ByteBuffer key,
+                               ByteBuffer value,
+                               Header[] headers) throws IOException {
+        int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
+        ByteUtils.writeVarint(sizeInBytes, out);
+
+        byte attributes = computeAttributes(isControlRecord);
+        out.write(attributes);
+
+        ByteUtils.writeVarlong(timestampDelta, out);
+        ByteUtils.writeVarint(offsetDelta, out);
+
+        if (key == null) {
+            ByteUtils.writeVarint(-1, out);
+        } else {
+            int keySize = key.remaining();
+            ByteUtils.writeVarint(keySize, out);
+            Utils.writeTo(out, key, keySize);
+        }
+
+        if (value == null) {
+            ByteUtils.writeVarint(-1, out);
+        } else {
+            int valueSize = value.remaining();
+            ByteUtils.writeVarint(valueSize, out);
+            Utils.writeTo(out, value, valueSize);
+        }
+
+        if (headers == null)
+            throw new IllegalArgumentException("Headers cannot be null");
+
+        ByteUtils.writeVarint(headers.length, out);
+
+        for (Header header : headers) {
+            String headerKey = header.key();
+            if (headerKey == null)
+                throw new IllegalArgumentException("Invalid null header key found in headers");
+
+            byte[] utf8Bytes = Utils.utf8(headerKey);
+            ByteUtils.writeVarint(utf8Bytes.length, out);
+            out.write(utf8Bytes);
+
+            ByteBuffer headerValue = header.value();
+            if (headerValue == null) {
+                ByteUtils.writeVarint(-1, out);
+            } else {
+                int headerValueSize = headerValue.remaining();
+                ByteUtils.writeVarint(headerValueSize, out);
+                Utils.writeTo(out, headerValue, headerValueSize);
+            }
+        }
+
+        return computeChecksum(timestampDelta, key, value);
+    }
+
+    /**
+     * Write the record to `out` and return its crc.
+     */
+    public static long writeTo(ByteBuffer out,
+                               boolean isControlRecord,
+                               int offsetDelta,
+                               long timestampDelta,
+                               ByteBuffer key,
+                               ByteBuffer value,
+                               Header[] headers) {
+        try {
+            return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), isControlRecord, offsetDelta,
+                    timestampDelta, key, value, headers);
+        } catch (IOException e) {
+            // cannot actually be raised by ByteBufferOutputStream
+            throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e);
+        }
+    }
+
+    /**
+     * Compute the checksum of the record from the timestamp, key and value payloads
+     */
+    private static long computeChecksum(long timestamp,
+                                        ByteBuffer key,
+                                        ByteBuffer value) {
+        Crc32 crc = new Crc32();
+        crc.updateLong(timestamp);
+
+        if (key != null)
+            crc.update(key, key.remaining());
+
+        if (value != null)
+            crc.update(value, value.remaining());
+
+        return crc.getValue();
+    }
+
+    @Override
+    public boolean hasMagic(byte magic) {
+        return magic >= MAGIC_VALUE_V2;
+    }
+
+    @Override
+    public boolean isCompressed() {
+        return false;
+    }
+
+    @Override
+    public boolean hasTimestampType(TimestampType timestampType) {
+        return false;
+    }
+
+    @Override
+    public boolean isControlRecord() {
+        return (attributes & CONTROL_FLAG_MASK) != 0;
+    }
+
+    public static DefaultRecord readFrom(DataInputStream input,
+                                         long baseOffset,
+                                         long baseTimestamp,
+                                         int baseSequence,
+                                         Long logAppendTime) throws IOException {
+        int sizeOfBodyInBytes = ByteUtils.readVarint(input);
+        ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
+        input.readFully(recordBuffer.array(), recordBuffer.arrayOffset(), sizeOfBodyInBytes);
+        int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+        return readFrom(recordBuffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+    }
+
+    public static DefaultRecord readFrom(ByteBuffer buffer,
+                                         long baseOffset,
+                                         long baseTimestamp,
+                                         int baseSequence,
+                                         Long logAppendTime) {
+        int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
+        if (buffer.remaining() < sizeOfBodyInBytes)
+            return null;
+
+        int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
+        return readFrom(buffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+    }
+
+    private static DefaultRecord readFrom(ByteBuffer buffer,
+                                          int sizeInBytes,
+                                          long baseOffset,
+                                          long baseTimestamp,
+                                          int baseSequence,
+                                          Long logAppendTime) {
+        byte attributes = buffer.get();
+        long timestampDelta = ByteUtils.readVarlong(buffer);
+        long timestamp = baseTimestamp + timestampDelta;
+        if (logAppendTime != null)
+            timestamp = logAppendTime;
+
+        int offsetDelta = ByteUtils.readVarint(buffer);
+        long offset = baseOffset + offsetDelta;
+        int sequence = baseSequence >= 0 ? baseSequence + offsetDelta : RecordBatch.NO_SEQUENCE;
+
+        ByteBuffer key = null;
+        int keySize = ByteUtils.readVarint(buffer);
+        if (keySize >= 0) {
+            key = buffer.slice();
+            key.limit(keySize);
+            buffer.position(buffer.position() + keySize);
+        }
+
+        ByteBuffer value = null;
+        int valueSize = ByteUtils.readVarint(buffer);
+        if (valueSize >= 0) {
+            value = buffer.slice();
+            value.limit(valueSize);
+            buffer.position(buffer.position() + valueSize);
+        }
+
+        int numHeaders = ByteUtils.readVarint(buffer);
+        if (numHeaders < 0)
+            throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
+
+        if (numHeaders == 0)
+            return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, Record.EMPTY_HEADERS);
+
+        Header[] headers = new Header[numHeaders];
+        for (int i = 0; i < numHeaders; i++) {
+            int headerKeySize = ByteUtils.readVarint(buffer);
+            if (headerKeySize < 0)
+                throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
+
+            String headerKey = Utils.utf8(buffer, headerKeySize);
+            buffer.position(buffer.position() + headerKeySize);
+
+            ByteBuffer headerValue = null;
+            int headerValueSize = ByteUtils.readVarint(buffer);
+            if (headerValueSize >= 0) {
+                headerValue = buffer.slice();
+                headerValue.limit(headerValueSize);
+                buffer.position(buffer.position() + headerValueSize);
+            }
+
+            headers[i] = new Header(headerKey, headerValue);
+        }
+
+        return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers);
+    }
+
+    private static byte computeAttributes(boolean isControlRecord) {
+        return isControlRecord ? CONTROL_FLAG_MASK : (byte) 0;
+    }
+
+    public static int sizeInBytes(int offsetDelta,
+                                  long timestampDelta,
+                                  byte[] key,
+                                  byte[] value) {
+        return sizeInBytes(offsetDelta, timestampDelta, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
+    }
+
+    public static int sizeInBytes(int offsetDelta,
+                                  long timestampDelta,
+                                  ByteBuffer key,
+                                  ByteBuffer value,
+                                  Header[] headers) {
+        int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
+        return bodySize + ByteUtils.sizeOfVarint(bodySize);
+    }
+
+    private static int sizeOfBodyInBytes(int offsetDelta,
+                                         long timestampDelta,
+                                         ByteBuffer key,
+                                         ByteBuffer value,
+                                         Header[] headers) {
+        int size = 1; // always one byte for attributes
+        size += ByteUtils.sizeOfVarint(offsetDelta);
+        size += ByteUtils.sizeOfVarlong(timestampDelta);
+
+        int keySize = key == null ? -1 : key.remaining();
+        int valueSize = value == null ? -1 : value.remaining();
+        size += sizeOf(keySize, valueSize, headers);
+
+        return size;
+    }
+
+    private static int sizeOf(int keySize, int valueSize, Header[] headers) {
+        int size = 0;
+        if (keySize < 0)
+            size += NULL_VARINT_SIZE_BYTES;
+        else
+            size += ByteUtils.sizeOfVarint(keySize) + keySize;
+
+        if (valueSize < 0)
+            size += NULL_VARINT_SIZE_BYTES;
+        else
+            size += ByteUtils.sizeOfVarint(valueSize) + valueSize;
+
+        if (headers == null)
+            throw new IllegalArgumentException("Headers cannot be null");
+
+        size += ByteUtils.sizeOfVarint(headers.length);
+        for (Header header : headers) {
+            String headerKey = header.key();
+            if (headerKey == null)
+                throw new IllegalArgumentException("Invalid null header key found in headers");
+
+            int headerKeySize = Utils.utf8Length(headerKey);
+            size += ByteUtils.sizeOfVarint(headerKeySize) + headerKeySize;
+
+            ByteBuffer headerValue = header.value();
+            if (headerValue == null) {
+                size += NULL_VARINT_SIZE_BYTES;
+            } else {
+                int headerValueSize = headerValue.remaining();
+                size += ByteUtils.sizeOfVarint(headerValueSize) + headerValueSize;
+            }
+        }
+        return size;
+    }
+
+    static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
+        int keySize = key == null ? -1 : key.length;
+        int valueSize = value == null ? -1 : value.length;
+        return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
new file mode 100644
index 0000000..1de568e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+
+/**
+ * RecordBatch implementation for magic 2 and above. The schema is given below:
+ *
+ * RecordBatch =>
+ *  BaseOffset => Int64
+ *  Length => Int32
+ *  PartitionLeaderEpoch => Int32
+ *  Magic => Int8
+ *  CRC => Uint32
+ *  Attributes => Int16
+ *  LastOffsetDelta => Int32
+ *  BaseTimestamp => Int64
+ *  MaxTimestamp => Int64
+ *  ProducerId => Int64
+ *  ProducerEpoch => Int16
+ *  BaseSequence => Int32
+ *  Records => [Record]
+ *
+ * Note that when compression is enabled (see attributes below), the compressed record data is serialized
+ * directly following the count of the number of records.
+ *
+ * The CRC covers the data from the attributes to the end of the batch. Note that the location is
+ * located after the magic byte, which means that consumers must parse the magic byte before
+ * deciding how to interpret the bytes between the batch length and the magic byte. The reason that
+ * the partition leader epoch field is moved ahead of the CRC is to avoid the need to recompute the CRC
+ * for every batch that is received by the broker when this field is assigned.
+ *
+ * The current attributes are given below:
+ *
+ *  -----------------------------------------------------------------------------------
+ *  | Unused (5-15) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
+ *  -----------------------------------------------------------------------------------
+ */
+public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
+    static final int BASE_OFFSET_OFFSET = 0;
+    static final int BASE_OFFSET_LENGTH = 8;
+    static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH;
+    static final int LENGTH_LENGTH = 4;
+    static final int PARTITION_LEADER_EPOCH_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH;
+    static final int PARTITION_LEADER_EPOCH_LENGTH = 4;
+    static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH;
+    static final int MAGIC_LENGTH = 1;
+    static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+    static final int CRC_LENGTH = 4;
+    static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
+    static final int ATTRIBUTE_LENGTH = 2;
+    static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    static final int LAST_OFFSET_DELTA_LENGTH = 4;
+    static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
+    static final int BASE_TIMESTAMP_LENGTH = 8;
+    static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH;
+    static final int MAX_TIMESTAMP_LENGTH = 8;
+    static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH;
+    static final int PRODUCER_ID_LENGTH = 8;
+    static final int PRODUCER_EPOCH_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_LENGTH;
+    static final int PRODUCER_EPOCH_LENGTH = 2;
+    static final int BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH;
+    static final int BASE_SEQUENCE_LENGTH = 4;
+    static final int RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH;
+    static final int RECORDS_COUNT_LENGTH = 4;
+    static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
+    public static final int RECORD_BATCH_OVERHEAD = RECORDS_OFFSET;
+
+    private static final byte COMPRESSION_CODEC_MASK = 0x07;
+    private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
+    private static final byte TIMESTAMP_TYPE_MASK = 0x08;
+
+    private final ByteBuffer buffer;
+
+    DefaultRecordBatch(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public byte magic() {
+        return buffer.get(MAGIC_OFFSET);
+    }
+
+    @Override
+    public void ensureValid() {
+        if (sizeInBytes() < RECORD_BATCH_OVERHEAD)
+            throw new InvalidRecordException("Record batch is corrupt (the size " + sizeInBytes() +
+                    "is smaller than the minimum allowed overhead " + RECORD_BATCH_OVERHEAD + ")");
+
+        if (!isValid())
+            throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+                    + ", computed crc = " + computeChecksum() + ")");
+    }
+
+    private long baseTimestamp() {
+        return buffer.getLong(BASE_TIMESTAMP_OFFSET);
+    }
+
+    @Override
+    public long maxTimestamp() {
+        return buffer.getLong(MAX_TIMESTAMP_OFFSET);
+    }
+
+    @Override
+    public TimestampType timestampType() {
+        return (attributes() & TIMESTAMP_TYPE_MASK) == 0 ? TimestampType.CREATE_TIME : TimestampType.LOG_APPEND_TIME;
+    }
+
+    @Override
+    public long baseOffset() {
+        return buffer.getLong(BASE_OFFSET_OFFSET);
+    }
+
+    @Override
+    public long lastOffset() {
+        return baseOffset() + lastOffsetDelta();
+    }
+
+    @Override
+    public long producerId() {
+        return buffer.getLong(PRODUCER_ID_OFFSET);
+    }
+
+    @Override
+    public short producerEpoch() {
+        return buffer.getShort(PRODUCER_EPOCH_OFFSET);
+    }
+
+    @Override
+    public int baseSequence() {
+        return buffer.getInt(BASE_SEQUENCE_OFFSET);
+    }
+
+    private int lastOffsetDelta() {
+        return buffer.getInt(LAST_OFFSET_DELTA_OFFSET);
+    }
+
+    @Override
+    public int lastSequence() {
+        return baseSequence() + lastOffsetDelta();
+    }
+
+    @Override
+    public CompressionType compressionType() {
+        return CompressionType.forId(attributes() & COMPRESSION_CODEC_MASK);
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return LOG_OVERHEAD + buffer.getInt(LENGTH_OFFSET);
+    }
+
+    private int count() {
+        return buffer.getInt(RECORDS_COUNT_OFFSET);
+    }
+
+    @Override
+    public Integer countOrNull() {
+        return count();
+    }
+
+    @Override
+    public void writeTo(ByteBuffer buffer) {
+        buffer.put(this.buffer.duplicate());
+    }
+
+    @Override
+    public boolean isTransactional() {
+        return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
+    }
+
+    @Override
+    public int partitionLeaderEpoch() {
+        return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
+    }
+
+    private Iterator<Record> compressedIterator() {
+        ByteBuffer buffer = this.buffer.duplicate();
+        buffer.position(RECORDS_OFFSET);
+        DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
+                new ByteBufferInputStream(buffer), magic()));
+
+        // TODO: An improvement for the consumer would be to only decompress the records
+        // we need to fill max.poll.records and leave the rest compressed.
+        int numRecords = count();
+        if (numRecords < 0)
+            throw new InvalidRecordException("Found invalid record count " + numRecords + " in magic v" +
+                    magic() + " batch");
+
+        List<Record> records = new ArrayList<>(numRecords);
+        try {
+            Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
+            long baseOffset = baseOffset();
+            long baseTimestamp = baseTimestamp();
+            int baseSequence = baseSequence();
+
+            for (int i = 0; i < numRecords; i++)
+                records.add(DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime));
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        } finally {
+            Utils.closeQuietly(stream, "records iterator stream");
+        }
+
+        return records.iterator();
+    }
+
+    private Iterator<Record> uncompressedIterator() {
+        final ByteBuffer buffer = this.buffer.duplicate();
+        final Long logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
+        final long baseOffset = baseOffset();
+        final long baseTimestamp = baseTimestamp();
+        final int baseSequence = baseSequence();
+
+        buffer.position(RECORDS_OFFSET);
+        final int totalRecords = count();
+
+        return new Iterator<Record>() {
+            int readRecords = 0;
+
+            @Override
+            public boolean hasNext() {
+                return readRecords < totalRecords;
+            }
+
+            @Override
+            public Record next() {
+                readRecords++;
+                return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    @Override
+    public Iterator<Record> iterator() {
+        if (isCompressed())
+            return compressedIterator();
+        else
+            return uncompressedIterator();
+    }
+
+    @Override
+    public void setLastOffset(long offset) {
+        buffer.putLong(BASE_OFFSET_OFFSET, offset - lastOffsetDelta());
+    }
+
+    @Override
+    public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
+        long currentMaxTimestamp = maxTimestamp();
+        // We don't need to recompute crc if the timestamp is not updated.
+        if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
+            return;
+
+        byte attributes = computeAttributes(compressionType(), timestampType, isTransactional());
+        buffer.putShort(ATTRIBUTES_OFFSET, attributes);
+        buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
+        long crc = computeChecksum();
+        ByteUtils.writeUnsignedInt(buffer, CRC_OFFSET, crc);
+    }
+
+    @Override
+    public void setPartitionLeaderEpoch(int epoch) {
+        buffer.putInt(PARTITION_LEADER_EPOCH_OFFSET, epoch);
+    }
+
+    @Override
+    public long checksum() {
+        return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
+    }
+
+    public boolean isValid() {
+        return sizeInBytes() >= RECORD_BATCH_OVERHEAD && checksum() == computeChecksum();
+    }
+
+    private long computeChecksum() {
+        return Crc32.crc32(buffer, ATTRIBUTES_OFFSET, buffer.limit() - ATTRIBUTES_OFFSET);
+    }
+
+    private byte attributes() {
+        // note we're not using the second byte of attributes
+        return (byte) buffer.getShort(ATTRIBUTES_OFFSET);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        DefaultRecordBatch that = (DefaultRecordBatch) o;
+        return buffer != null ? buffer.equals(that.buffer) : that.buffer == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return buffer != null ? buffer.hashCode() : 0;
+    }
+
+    private static byte computeAttributes(CompressionType type, TimestampType timestampType, boolean isTransactional) {
+        if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
+            throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
+                    "format v2 and above");
+
+        byte attributes = isTransactional ? TRANSACTIONAL_FLAG_MASK : 0;
+        if (type.id > 0)
+            attributes |= COMPRESSION_CODEC_MASK & type.id;
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+            attributes |= TIMESTAMP_TYPE_MASK;
+        return attributes;
+    }
+
+    static void writeHeader(ByteBuffer buffer,
+                            long baseOffset,
+                            int lastOffsetDelta,
+                            int sizeInBytes,
+                            byte magic,
+                            CompressionType compressionType,
+                            TimestampType timestampType,
+                            long baseTimestamp,
+                            long maxTimestamp,
+                            long pid,
+                            short epoch,
+                            int sequence,
+                            boolean isTransactional,
+                            int partitionLeaderEpoch,
+                            int numRecords) {
+        if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
+            throw new IllegalArgumentException("Invalid magic value " + magic);
+        if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP)
+            throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp);
+
+        short attributes = computeAttributes(compressionType, timestampType, isTransactional);
+
+        int position = buffer.position();
+        buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
+        buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD);
+        buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch);
+        buffer.put(position + MAGIC_OFFSET, magic);
+        buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
+        buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp);
+        buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp);
+        buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta);
+        buffer.putLong(position + PRODUCER_ID_OFFSET, pid);
+        buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
+        buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
+        buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords);
+        long crc = Crc32.crc32(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET);
+        buffer.putInt(position + CRC_OFFSET, (int) crc);
+    }
+
+    @Override
+    public String toString() {
+        return "RecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+    }
+
+    public static int sizeInBytes(long baseOffset, Iterable<Record> records) {
+        Iterator<Record> iterator = records.iterator();
+        if (!iterator.hasNext())
+            return 0;
+
+        int size = RECORD_BATCH_OVERHEAD;
+        Long baseTimestamp = null;
+        while (iterator.hasNext()) {
+            Record record = iterator.next();
+            int offsetDelta = (int) (record.offset() - baseOffset);
+            if (baseTimestamp == null)
+                baseTimestamp = record.timestamp();
+            long timestampDelta = record.timestamp() - baseTimestamp;
+            size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
+                    record.headers());
+        }
+        return size;
+    }
+
+    public static int sizeInBytes(Iterable<SimpleRecord> records) {
+        Iterator<SimpleRecord> iterator = records.iterator();
+        if (!iterator.hasNext())
+            return 0;
+
+        int size = RECORD_BATCH_OVERHEAD;
+        int offsetDelta = 0;
+        Long baseTimestamp = null;
+        while (iterator.hasNext()) {
+            SimpleRecord record = iterator.next();
+            if (baseTimestamp == null)
+                baseTimestamp = record.timestamp();
+            long timestampDelta = record.timestamp() - baseTimestamp;
+            size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, record.key(), record.value(),
+                    record.headers());
+        }
+        return size;
+    }
+
+    /**
+     * Get an upper bound on the size of a batch with only a single record using a given key and value.
+     */
+    static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
+        return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 1461b55..59055ed 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -23,16 +23,19 @@ import org.apache.kafka.common.utils.Utils;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Iterator;
+
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
 
 /**
  * A log input stream which is backed by a {@link FileChannel}.
  */
-public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelLogEntry> {
+public class FileLogInputStream implements LogInputStream<FileLogInputStream.FileChannelRecordBatch> {
     private int position;
     private final int end;
     private final FileChannel channel;
     private final int maxRecordSize;
-    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
+    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD);
 
     /**
      * Create a new log input stream over the FileChannel
@@ -41,10 +44,10 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
      * @param start Position in the file channel to start from
      * @param end Position in the file channel not to read past
      */
-    public FileLogInputStream(FileChannel channel,
-                              int maxRecordSize,
-                              int start,
-                              int end) {
+    FileLogInputStream(FileChannel channel,
+                       int maxRecordSize,
+                       int start,
+                       int end) {
         this.channel = channel;
         this.maxRecordSize = maxRecordSize;
         this.position = start;
@@ -52,8 +55,8 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
     }
 
     @Override
-    public FileChannelLogEntry nextEntry() throws IOException {
-        if (position + Records.LOG_OVERHEAD >= end)
+    public FileChannelRecordBatch nextBatch() throws IOException {
+        if (position + LOG_OVERHEAD >= end)
             return null;
 
         logHeaderBuffer.rewind();
@@ -63,45 +66,91 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
         long offset = logHeaderBuffer.getLong();
         int size = logHeaderBuffer.getInt();
 
-        if (size < Record.RECORD_OVERHEAD_V0)
-            throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", Record.RECORD_OVERHEAD_V0));
+        // V0 has the smallest overhead, stricter checking is done later
+        if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+            throw new CorruptRecordException(String.format("Record size is smaller than minimum record overhead (%d).", LegacyRecord.RECORD_OVERHEAD_V0));
 
         if (size > maxRecordSize)
             throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxRecordSize));
 
-        if (position + Records.LOG_OVERHEAD + size > end)
+        if (position + LOG_OVERHEAD + size > end)
             return null;
 
-        FileChannelLogEntry logEntry = new FileChannelLogEntry(offset, channel, position, size);
-        position += logEntry.sizeInBytes();
-        return logEntry;
+        FileChannelRecordBatch batch = new FileChannelRecordBatch(offset, channel, position, size);
+        position += batch.sizeInBytes();
+        return batch;
     }
 
     /**
-     * Log entry backed by an underlying FileChannel. This allows iteration over the shallow log
-     * entries without needing to read the record data into memory until it is needed. The downside
+     * Log entry backed by an underlying FileChannel. This allows iteration over the record batches
+     * without needing to read the record data into memory until it is needed. The downside
      * is that entries will generally no longer be readable when the underlying channel is closed.
      */
-    public static class FileChannelLogEntry extends LogEntry {
+    public static class FileChannelRecordBatch extends AbstractRecordBatch {
         private final long offset;
         private final FileChannel channel;
         private final int position;
-        private final int recordSize;
-        private Record record = null;
+        private final int batchSize;
+        private RecordBatch underlying;
+        private Byte magic;
 
-        private FileChannelLogEntry(long offset,
-                                   FileChannel channel,
-                                   int position,
-                                   int recordSize) {
+        private FileChannelRecordBatch(long offset,
+                                       FileChannel channel,
+                                       int position,
+                                       int batchSize) {
             this.offset = offset;
             this.channel = channel;
             this.position = position;
-            this.recordSize = recordSize;
+            this.batchSize = batchSize;
+        }
+
+        @Override
+        public long baseOffset() {
+            if (magic() >= RecordBatch.MAGIC_VALUE_V2)
+                return offset;
+
+            loadUnderlyingRecordBatch();
+            return underlying.baseOffset();
         }
 
         @Override
-        public long offset() {
-            return offset;
+        public CompressionType compressionType() {
+            loadUnderlyingRecordBatch();
+            return underlying.compressionType();
+        }
+
+        @Override
+        public TimestampType timestampType() {
+            loadUnderlyingRecordBatch();
+            return underlying.timestampType();
+        }
+
+        @Override
+        public long maxTimestamp() {
+            loadUnderlyingRecordBatch();
+            return underlying.maxTimestamp();
+        }
+
+        @Override
+        public long lastOffset() {
+            if (magic() < RecordBatch.MAGIC_VALUE_V2)
+                return offset;
+            else if (underlying != null)
+                return underlying.lastOffset();
+
+            try {
+                // TODO: this logic probably should be moved into DefaultRecordBatch somehow
+                // maybe we just need two separate implementations
+
+                byte[] offsetDelta = new byte[4];
+                ByteBuffer buf = ByteBuffer.wrap(offsetDelta);
+                channel.read(buf, position + DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET);
+                if (buf.hasRemaining())
+                    throw new KafkaException("Failed to read magic byte from FileChannel " + channel);
+                return offset + buf.getInt(0);
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
         }
 
         public int position() {
@@ -110,52 +159,151 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
 
         @Override
         public byte magic() {
-            if (record != null)
-                return record.magic();
+            if (magic != null)
+                return magic;
+            if (underlying != null)
+                return underlying.magic();
 
             try {
-                byte[] magic = new byte[1];
-                ByteBuffer buf = ByteBuffer.wrap(magic);
-                Utils.readFullyOrFail(channel, buf, position + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET, "magic byte");
-                return magic[0];
+                ByteBuffer buf = ByteBuffer.wrap(new byte[1]);
+                Utils.readFullyOrFail(channel, buf, position + Records.MAGIC_OFFSET, "magic byte");
+                magic = buf.get(0);
+                return magic;
             } catch (IOException e) {
                 throw new KafkaException(e);
             }
         }
 
-        /**
-         * Force load the record and its data (key and value) into memory.
-         * @return The resulting record
-         * @throws IOException for any IO errors reading from the underlying file
-         */
-        private Record loadRecord() throws IOException {
-            if (record != null)
-                return record;
+        @Override
+        public long producerId() {
+            loadUnderlyingRecordBatch();
+            return underlying.producerId();
+        }
 
-            ByteBuffer recordBuffer = ByteBuffer.allocate(recordSize);
-            Utils.readFullyOrFail(channel, recordBuffer, position + Records.LOG_OVERHEAD, "full record");
+        @Override
+        public short producerEpoch() {
+            loadUnderlyingRecordBatch();
+            return underlying.producerEpoch();
+        }
 
-            recordBuffer.rewind();
-            record = new Record(recordBuffer);
-            return record;
+        @Override
+        public int baseSequence() {
+            loadUnderlyingRecordBatch();
+            return underlying.baseSequence();
         }
 
         @Override
-        public Record record() {
-            if (record != null)
-                return record;
+        public int lastSequence() {
+            loadUnderlyingRecordBatch();
+            return underlying.lastSequence();
+        }
 
+        private void loadUnderlyingRecordBatch() {
             try {
-                return loadRecord();
+                if (underlying != null)
+                    return;
+
+                ByteBuffer batchBuffer = ByteBuffer.allocate(sizeInBytes());
+                Utils.readFullyOrFail(channel, batchBuffer, position, "full record batch");
+                batchBuffer.rewind();
+
+                byte magic = batchBuffer.get(Records.MAGIC_OFFSET);
+                if (magic > RecordBatch.MAGIC_VALUE_V1)
+                    underlying = new DefaultRecordBatch(batchBuffer);
+                else
+                    underlying = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchBuffer);
             } catch (IOException e) {
-                throw new KafkaException(e);
+                throw new KafkaException("Failed to load record batch at position " + position + " from file channel " + channel);
             }
         }
 
         @Override
+        public Iterator<Record> iterator() {
+            loadUnderlyingRecordBatch();
+            return underlying.iterator();
+        }
+
+        @Override
+        public boolean isValid() {
+            loadUnderlyingRecordBatch();
+            return underlying.isValid();
+        }
+
+        @Override
+        public void ensureValid() {
+            loadUnderlyingRecordBatch();
+            underlying.ensureValid();
+        }
+
+        @Override
+        public long checksum() {
+            loadUnderlyingRecordBatch();
+            return underlying.checksum();
+        }
+
+        @Override
         public int sizeInBytes() {
-            return Records.LOG_OVERHEAD + recordSize;
+            return LOG_OVERHEAD + batchSize;
+        }
+
+        @Override
+        public Integer countOrNull() {
+            loadUnderlyingRecordBatch();
+            return underlying.countOrNull();
+        }
+
+        @Override
+        public void writeTo(ByteBuffer buffer) {
+            try {
+                int limit = buffer.limit();
+                buffer.limit(buffer.position() + sizeInBytes());
+                Utils.readFully(channel, buffer, position);
+                buffer.limit(limit);
+            } catch (IOException e) {
+                throw new KafkaException("Failed to read record batch at position " + position + " from file channel " +
+                        channel, e);
+            }
         }
 
+        @Override
+        public boolean isTransactional() {
+            loadUnderlyingRecordBatch();
+            return underlying.isTransactional();
+        }
+
+        @Override
+        public int partitionLeaderEpoch() {
+            loadUnderlyingRecordBatch();
+            return underlying.partitionLeaderEpoch();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            FileChannelRecordBatch that = (FileChannelRecordBatch) o;
+
+            return offset == that.offset &&
+                    position == that.position &&
+                    batchSize == that.batchSize &&
+                    (channel == null ? that.channel == null : channel.equals(that.channel));
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) (offset ^ (offset >>> 32));
+            result = 31 * result + (channel != null ? channel.hashCode() : 0);
+            result = 31 * result + position;
+            result = 31 * result + batchSize;
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "FileChannelRecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+        }
     }
 }


Mime
View raw message