kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [08/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:44:01 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index f3cf43c..ab81bfe 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -17,15 +17,18 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static org.apache.kafka.common.utils.Utils.wrapNullable;
+
 /**
  * This class is used to write new log data in memory, i.e. this is the write path for {@link MemoryRecords}.
- * It transparently handles compression and exposes methods for appending new entries, possibly with message
+ * It transparently handles compression and exposes methods for appending new records, possibly with message
  * format conversion.
  */
 public class MemoryRecordsBuilder {
@@ -53,15 +56,21 @@ public class MemoryRecordsBuilder {
     private final int initPos;
     private final long baseOffset;
     private final long logAppendTime;
+    private final long producerId;
+    private final short producerEpoch;
+    private final int baseSequence;
+    private final boolean isTransactional;
+    private final int partitionLeaderEpoch;
     private final int writeLimit;
     private final int initialCapacity;
 
     private long writtenUncompressed = 0;
-    private long numRecords = 0;
+    private int numRecords = 0;
     private float compressionRate = 1;
-    private long maxTimestamp = Record.NO_TIMESTAMP;
+    private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
     private long offsetOfMaxTimestamp = -1;
-    private long lastOffset = -1;
+    private Long lastOffset = null;
+    private Long baseTimestamp = null;
 
     private MemoryRecords builtRecords;
 
@@ -75,6 +84,10 @@ public class MemoryRecordsBuilder {
      * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}.
      * @param baseOffset The initial offset to use for
      * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used.
+     * @param producerId The producer ID (PID) associated with the producer writing this record set
+     * @param producerEpoch The epoch of the producer
+     * @param baseSequence The sequence number of the first record in this set
+     * @param isTransactional Whether or not the records are part of a transaction
      * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded
      *                   when compression is used since size estimates are rough, and in the case that the first
      *                   record added exceeds the size).
@@ -85,23 +98,58 @@ public class MemoryRecordsBuilder {
                                 TimestampType timestampType,
                                 long baseOffset,
                                 long logAppendTime,
+                                long producerId,
+                                short producerEpoch,
+                                int baseSequence,
+                                boolean isTransactional,
+                                int partitionLeaderEpoch,
                                 int writeLimit) {
-        if (magic > Record.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
+        if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
             throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
 
+        if (isTransactional) {
+            if (producerId == RecordBatch.NO_PRODUCER_ID)
+                throw new IllegalArgumentException("Cannot write transactional messages without a valid producer ID");
+
+            if (magic < RecordBatch.MAGIC_VALUE_V2)
+                throw new IllegalArgumentException("Transactional messages are not supported for magic " + magic);
+        }
+
+        if (producerId != RecordBatch.NO_PRODUCER_ID) {
+            if (producerEpoch < 0)
+                throw new IllegalArgumentException("Invalid negative producer epoch");
+
+            if (baseSequence < 0)
+                 throw new IllegalArgumentException("Invalid negative sequence number used");
+
+            if (magic < RecordBatch.MAGIC_VALUE_V2)
+                throw new IllegalArgumentException("Idempotent messages are not supported for magic " + magic);
+        }
+
         this.magic = magic;
         this.timestampType = timestampType;
         this.compressionType = compressionType;
         this.baseOffset = baseOffset;
         this.logAppendTime = logAppendTime;
         this.initPos = buffer.position();
+        this.numRecords = 0;
+        this.writtenUncompressed = 0;
+        this.compressionRate = 1;
+        this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
+        this.producerId = producerId;
+        this.producerEpoch = producerEpoch;
+        this.baseSequence = baseSequence;
+        this.isTransactional = isTransactional;
+        this.partitionLeaderEpoch = partitionLeaderEpoch;
         this.writeLimit = writeLimit;
         this.initialCapacity = buffer.capacity();
 
-        if (compressionType != CompressionType.NONE) {
+        if (magic > RecordBatch.MAGIC_VALUE_V1) {
+            buffer.position(initPos + DefaultRecordBatch.RECORDS_OFFSET);
+        } else if (compressionType != CompressionType.NONE) {
             // for compressed records, leave space for the header and the shallow message metadata
             // and move the starting position to the value payload offset
-            buffer.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic));
+            buffer.position(initPos + Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic));
         }
 
         // create the stream
@@ -139,8 +187,8 @@ public class MemoryRecordsBuilder {
     public RecordsInfo info() {
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             return new RecordsInfo(logAppendTime,  lastOffset);
-        else if (maxTimestamp == Record.NO_TIMESTAMP)
-            return new RecordsInfo(Record.NO_TIMESTAMP, lastOffset);
+        else if (maxTimestamp == RecordBatch.NO_TIMESTAMP)
+            return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
         else
             return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
     }
@@ -159,8 +207,10 @@ public class MemoryRecordsBuilder {
             buffer().position(initPos);
             builtRecords = MemoryRecords.EMPTY;
         } else {
-            if (compressionType != CompressionType.NONE)
-                writerCompressedWrapperHeader();
+            if (magic > RecordBatch.MAGIC_VALUE_V1)
+                writeDefaultBatchHeader();
+            else if (compressionType != CompressionType.NONE)
+                writeLegacyCompressedWrapperHeader();
 
             ByteBuffer buffer = buffer().duplicate();
             buffer.flip();
@@ -169,17 +219,41 @@ public class MemoryRecordsBuilder {
         }
     }
 
-    private void writerCompressedWrapperHeader() {
+    private void writeDefaultBatchHeader() {
+        ByteBuffer buffer = bufferStream.buffer();
+        int pos = buffer.position();
+        buffer.position(initPos);
+        int size = pos - initPos;
+        int offsetDelta = (int) (lastOffset - baseOffset);
+
+        final long baseTimestamp;
+        final long maxTimestamp;
+        if (timestampType == TimestampType.LOG_APPEND_TIME) {
+            baseTimestamp = logAppendTime;
+            maxTimestamp = logAppendTime;
+        } else {
+            baseTimestamp = this.baseTimestamp;
+            maxTimestamp = this.maxTimestamp;
+        }
+
+        DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
+                baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional,
+                partitionLeaderEpoch, numRecords);
+
+        buffer.position(pos);
+    }
+
+    private void writeLegacyCompressedWrapperHeader() {
         ByteBuffer buffer = bufferStream.buffer();
         int pos = buffer.position();
         buffer.position(initPos);
 
         int wrapperSize = pos - initPos - Records.LOG_OVERHEAD;
-        int writtenCompressed = wrapperSize - Record.recordOverhead(magic);
-        LogEntry.writeHeader(buffer, lastOffset, wrapperSize);
+        int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic);
+        AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize);
 
         long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp;
-        Record.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
+        LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
 
         buffer.position(pos);
 
@@ -189,86 +263,149 @@ public class MemoryRecordsBuilder {
             compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
     }
 
+    private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
+                                 ByteBuffer value, Header[] headers) {
+        try {
+            if (lastOffset != null && offset <= lastOffset)
+                throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+
+            if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
+                throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
+
+            if (magic < RecordBatch.MAGIC_VALUE_V2) {
+                if (isControlRecord)
+                    throw new IllegalArgumentException("Magic v" + magic + " does not support control records");
+                if (headers != null && headers.length > 0)
+                    throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
+            }
+
+            if (baseTimestamp == null)
+                baseTimestamp = timestamp;
+
+            if (magic > RecordBatch.MAGIC_VALUE_V1)
+                return appendDefaultRecord(offset, isControlRecord, timestamp, key, value, headers);
+            else
+                return appendLegacyRecord(offset, timestamp, key, value);
+        } catch (IOException e) {
+            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
+        }
+    }
+
     /**
      * Append a new record at the given offset.
      * @param offset The absolute offset of the record in the log buffer
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
+     * @param headers The record headers if there are any
      * @return crc of the record
      */
-    public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
-        try {
-            if (lastOffset >= 0 && offset <= lastOffset)
-                throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+    public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
+        return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers);
+    }
 
-            int size = Record.recordSize(magic, key, value);
-            LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+    /**
+     * Append a new record at the given offset.
+     * @param offset The absolute offset of the record in the log buffer
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @param headers The record headers if there are any
+     * @return crc of the record
+     */
+    public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+        return appendWithOffset(offset, false, timestamp, key, value, headers);
+    }
 
-            if (timestampType == TimestampType.LOG_APPEND_TIME)
-                timestamp = logAppendTime;
-            long crc = Record.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
-            recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
-            return crc;
-        } catch (IOException e) {
-            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
-        }
+    /**
+     * Append a new record at the given offset.
+     * @param offset The absolute offset of the record in the log buffer
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @return crc of the record
+     */
+    public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
+        return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
     }
 
     /**
-     * Append a new record at the next consecutive offset. If no records have been appended yet, use the base
-     * offset of this builder.
+     * Append a new record at the given offset.
+     * @param offset The absolute offset of the record in the log buffer
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
      * @return crc of the record
      */
-    public long append(long timestamp, byte[] key, byte[] value) {
-        return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value);
+    public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
+        return appendWithOffset(offset, false, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
     /**
-     * Add the record at the next consecutive offset, converting to the desired magic value if necessary.
-     * @param record The record to add
+     * Append a new record at the given offset.
+     * @param offset The absolute offset of the record in the log buffer
+     * @param record The record to append
+     * @return crc of the record
      */
-    public void convertAndAppend(Record record) {
-        convertAndAppendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record);
+    public long appendWithOffset(long offset, SimpleRecord record) {
+        return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value(), record.headers());
     }
 
     /**
-     * Add the record at the given offset, converting to the desired magic value if necessary.
-     * @param offset The offset of the record
-     * @param record The record to add
+     * Append a new record at the next sequential offset.
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @return crc of the record
      */
-    public void convertAndAppendWithOffset(long offset, Record record) {
-        if (magic == record.magic()) {
-            appendWithOffset(offset, record);
-            return;
-        }
+    public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
+        return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, Record.EMPTY_HEADERS);
+    }
 
-        if (lastOffset >= 0 && offset <= lastOffset)
-            throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
+    /**
+     * Append a new record at the next sequential offset.
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @return crc of the record
+     */
+    public long append(long timestamp, byte[] key, byte[] value) {
+        return append(timestamp, wrapNullable(key), wrapNullable(value));
+    }
 
-        try {
-            int size = record.convertedSize(magic);
-            LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
-            long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : record.timestamp();
-            record.convertTo(appendStream, magic, timestamp, timestampType);
-            recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
-        } catch (IOException e) {
-            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
-        }
+    /**
+     * Append a new record at the next sequential offset.
+     * @param record The record to append
+     * @return crc of the record
+     */
+    public long append(SimpleRecord record) {
+        return appendWithOffset(nextSequentialOffset(), record);
+    }
+
+    /**
+     * Append a control record at the next sequential offset.
+     * @param timestamp The record timestamp
+     * @param type The control record type (cannot be UNKNOWN)
+     * @param value The control record value
+     * @return crc of the record
+     */
+    public long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
+        Struct keyStruct = type.recordKey();
+        ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
+        keyStruct.writeTo(key);
+        key.flip();
+        return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
     }
 
     /**
-     * Add a record without doing offset/magic validation (this should only be used in testing).
+     * Add a legacy record without doing offset/magic validation (this should only be used in testing).
      * @param offset The offset of the record
      * @param record The record to add
      */
-    public void appendUnchecked(long offset, Record record) {
+    public void appendUncheckedWithOffset(long offset, LegacyRecord record) {
         try {
             int size = record.sizeInBytes();
-            LogEntry.writeHeader(appendStream, toInnerOffset(offset), size);
+            AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);
 
             ByteBuffer buffer = record.buffer().duplicate();
             appendStream.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
@@ -280,17 +417,32 @@ public class MemoryRecordsBuilder {
     }
 
     /**
-     * Add a record with a given offset. The record must have a magic which matches the magic use to
-     * construct this builder and the offset must be greater than the last appended entry.
+     * Append a record at the next sequential offset.
+     * @param record the record to add
+     */
+    public void append(Record record) {
+        appendWithOffset(record.offset(), record.isControlRecord(), record.timestamp(), record.key(), record.value(),
+                record.headers());
+    }
+
+    /**
+     * Append a log record using a different offset
      * @param offset The offset of the record
      * @param record The record to add
      */
     public void appendWithOffset(long offset, Record record) {
-        if (record.magic() != magic)
-            throw new IllegalArgumentException("Inner log entries must have matching magic values as the wrapper");
-        if (lastOffset >= 0 && offset <= lastOffset)
-            throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset));
-        appendUnchecked(offset, record);
+        appendWithOffset(offset, record.isControlRecord(), record.timestamp(), record.key(), record.value(),
+                record.headers());
+    }
+
+    /**
+     * Add a record with a given offset. The record must have a magic which matches the magic use to
+     * construct this builder and the offset must be greater than the last appended record.
+     * @param offset The offset of the record
+     * @param record The record to add
+     */
+    public void appendWithOffset(long offset, LegacyRecord record) {
+        appendWithOffset(offset, record.timestamp(), record.key(), record.value());
     }
 
     /**
@@ -298,8 +450,32 @@ public class MemoryRecordsBuilder {
      * offset of this builder.
      * @param record The record to add
      */
-    public void append(Record record) {
-        appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record);
+    public void append(LegacyRecord record) {
+        appendWithOffset(nextSequentialOffset(), record);
+    }
+
+    private long appendDefaultRecord(long offset, boolean isControlRecord, long timestamp,
+                                     ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException {
+        int offsetDelta = (int) (offset - baseOffset);
+        long timestampDelta = timestamp - baseTimestamp;
+        long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers);
+        // TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size?
+        recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers));
+        return crc;
+    }
+
+    private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
+        if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
+            timestamp = logAppendTime;
+
+        int size = LegacyRecord.recordSize(magic, key, value);
+        AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);
+
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+            timestamp = logAppendTime;
+        long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
+        recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
+        return crc;
     }
 
     private long toInnerOffset(long offset) {
@@ -310,11 +486,17 @@ public class MemoryRecordsBuilder {
     }
 
     private void recordWritten(long offset, long timestamp, int size) {
+        if (numRecords == Integer.MAX_VALUE)
+            throw new IllegalArgumentException("Maximum number of records per batch exceeded, max records: " + Integer.MAX_VALUE);
+        if (offset - baseOffset > Integer.MAX_VALUE)
+            throw new IllegalArgumentException("Maximum offset delta exceeded, base offset: " + baseOffset +
+                    ", last offset: " + offset);
+
         numRecords += 1;
         writtenUncompressed += size;
         lastOffset = offset;
 
-        if (timestamp > maxTimestamp) {
+        if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
             maxTimestamp = timestamp;
             offsetOfMaxTimestamp = offset;
         }
@@ -345,10 +527,22 @@ public class MemoryRecordsBuilder {
      * the checking should be based on the capacity of the initialized buffer rather than the write limit in order
      * to accept this single record.
      */
-    public boolean hasRoomFor(byte[] key, byte[] value) {
-        return !isFull() && (numRecords == 0 ?
-                this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(magic, key, value) :
-                this.writeLimit >= estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(magic, key, value));
+    public boolean hasRoomFor(long timestamp, byte[] key, byte[] value) {
+        if (isFull())
+            return false;
+
+        final int recordSize;
+        if (magic < RecordBatch.MAGIC_VALUE_V2) {
+            recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
+        } else {
+            int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
+            long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp;
+            recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value);
+        }
+
+        return numRecords == 0 ?
+                this.initialCapacity >= recordSize :
+                this.writeLimit >= estimatedBytesWritten() + recordSize;
     }
 
     public boolean isClosed() {
@@ -365,6 +559,14 @@ public class MemoryRecordsBuilder {
         return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten();
     }
 
+    public byte magic() {
+        return magic;
+    }
+
+    private long nextSequentialOffset() {
+        return lastOffset == null ? baseOffset : lastOffset + 1;
+    }
+
     public static class RecordsInfo {
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
new file mode 100644
index 0000000..2f0a96c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+/**
+ * A mutable record batch is one that can be modified in place (without copying). This is used by the broker
+ * to override certain fields in the batch before appending it to the log.
+ */
+public interface MutableRecordBatch extends RecordBatch {
+
+    /**
+     * Set the last offset of this batch.
+     * @param offset The last offset to use
+     */
+    void setLastOffset(long offset);
+
+    /**
+     * Set the max timestamp for this batch. When using log append time, this effectively overrides the individual
+     * timestamps of all the records contained in the batch. Note that this typically requires re-computation
+     * of the batch's CRC.
+     * @param timestampType The timestamp type
+     * @param maxTimestamp The maximum timestamp
+     */
+    void setMaxTimestamp(TimestampType timestampType, long maxTimestamp);
+
+    /**
+     * Set the partition leader epoch for this batch of records.
+     * @param epoch The partition leader epoch to use
+     */
+    void setPartitionLeaderEpoch(int epoch);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 9932238..437ee3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -16,635 +16,133 @@
  */
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.apache.kafka.common.utils.ByteUtils;
-import org.apache.kafka.common.utils.Crc32;
-import org.apache.kafka.common.utils.Utils;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.utils.Utils.wrapNullable;
-
 /**
- * A record: a serialized key and value along with the associated CRC and other fields
+ * A log record is a tuple consisting of a unique offset in the log, a sequence number assigned by
+ * the producer, a timestamp, a key and a value.
  */
-public final class Record {
-
-    /**
-     * The current offset and size for all the fixed-length fields
-     */
-    public static final int CRC_OFFSET = 0;
-    public static final int CRC_LENGTH = 4;
-    public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
-    public static final int MAGIC_LENGTH = 1;
-    public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
-    public static final int ATTRIBUTE_LENGTH = 1;
-    public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
-    public static final int TIMESTAMP_LENGTH = 8;
-    public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
-    public static final int KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH;
-    public static final int KEY_SIZE_LENGTH = 4;
-    public static final int KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH;
-    public static final int KEY_OFFSET_V1 = KEY_SIZE_OFFSET_V1 + KEY_SIZE_LENGTH;
-    public static final int VALUE_SIZE_LENGTH = 4;
-
-    /**
-     * The size for the record header
-     */
-    public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
-
-    /**
-     * The amount of overhead bytes in a record
-     */
-    public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+public interface Record {
 
-    /**
-     * The amount of overhead bytes in a record
-     */
-    public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+    Header[] EMPTY_HEADERS = new Header[0];
 
     /**
-     * The "magic" values
+     * The offset of this record in the log
+     * @return the offset
      */
-    public static final byte MAGIC_VALUE_V0 = 0;
-    public static final byte MAGIC_VALUE_V1 = 1;
+    long offset();
 
     /**
-     * The current "magic" value
+     * Get the sequence number assigned by the producer.
+     * @return the sequence number
      */
-    public static final byte CURRENT_MAGIC_VALUE = MAGIC_VALUE_V1;
+    long sequence();
 
     /**
-     * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
-     * compression
+     * Get the size in bytes of this record.
+     * @return the size of the record in bytes
      */
-    public static final int COMPRESSION_CODEC_MASK = 0x07;
+    int sizeInBytes();
 
     /**
-     * Specify the mask of timestamp type.
-     * 0 for CreateTime, 1 for LogAppendTime.
+     * Get the record's timestamp.
+     * @return the record's timestamp
      */
-    public static final byte TIMESTAMP_TYPE_MASK = 0x08;
-    public static final int TIMESTAMP_TYPE_ATTRIBUTE_OFFSET = 3;
+    long timestamp();
 
     /**
-     * Timestamp value for records without a timestamp
+     * Get a checksum of the record contents.
+     * @return a 4-byte unsigned checksum represented as a long
      */
-    public static final long NO_TIMESTAMP = -1L;
-
-    private final ByteBuffer buffer;
-    private final Long wrapperRecordTimestamp;
-    private final TimestampType wrapperRecordTimestampType;
-
-    public Record(ByteBuffer buffer) {
-        this(buffer, null, null);
-    }
-
-    public Record(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) {
-        this.buffer = buffer;
-        this.wrapperRecordTimestamp = wrapperRecordTimestamp;
-        this.wrapperRecordTimestampType = wrapperRecordTimestampType;
-    }
+    long checksum();
 
     /**
-     * Compute the checksum of the record from the record contents
+     * Check whether the record has a valid checksum.
+     * @return true if the record has a valid checksum, false otherwise
      */
-    public long computeChecksum() {
-        return Utils.computeChecksum(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
-    }
+    boolean isValid();
 
     /**
-     * Retrieve the previously computed CRC for this record
+     * Raise a {@link org.apache.kafka.common.errors.CorruptRecordException} if the record does not have a valid checksum.
      */
-    public long checksum() {
-        return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
-    }
+    void ensureValid();
 
     /**
-     * Returns true if the crc stored with the record matches the crc computed off the record contents
+     * Get the size in bytes of the key.
+     * @return the size of the key, or -1 if there is no key
      */
-    public boolean isValid() {
-        return sizeInBytes() >= CRC_LENGTH && checksum() == computeChecksum();
-    }
-
-    public Long wrapperRecordTimestamp() {
-        return wrapperRecordTimestamp;
-    }
-
-    public TimestampType wrapperRecordTimestampType() {
-        return wrapperRecordTimestampType;
-    }
+    int keySize();
 
     /**
-     * Throw an InvalidRecordException if isValid is false for this record
+     * Check whether this record has a key
+     * @return true if there is a key, false otherwise
      */
-    public void ensureValid() {
-        if (!isValid()) {
-            if (sizeInBytes() < CRC_LENGTH)
-                throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too "
-                        + "small, size = " + sizeInBytes() + ")");
-            else
-                throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
-                        + ", computed crc = " + computeChecksum() + ")");
-        }
-    }
+    boolean hasKey();
 
     /**
-     * The complete serialized size of this record in bytes (including crc, header attributes, etc), but
-     * excluding the log overhead (offset and record size).
-     * @return the size in bytes
+     * Get the record's key.
+     * @return the key or null if there is none
      */
-    public int sizeInBytes() {
-        return buffer.limit();
-    }
+    ByteBuffer key();
 
     /**
-     * The length of the key in bytes
-     * @return the size in bytes of the key (0 if the key is null)
+     * Get the size in bytes of the value.
+     * @return the size of the value, or -1 if the value is null
      */
-    public int keySize() {
-        if (magic() == MAGIC_VALUE_V0)
-            return buffer.getInt(KEY_SIZE_OFFSET_V0);
-        else
-            return buffer.getInt(KEY_SIZE_OFFSET_V1);
-    }
+    int valueSize();
 
     /**
-     * Does the record have a key?
+     * Check whether a value is present (i.e. if the value is not null)
      * @return true if so, false otherwise
      */
-    public boolean hasKey() {
-        return keySize() >= 0;
-    }
-
-    /**
-     * The position where the value size is stored
-     */
-    private int valueSizeOffset() {
-        if (magic() == MAGIC_VALUE_V0)
-            return KEY_OFFSET_V0 + Math.max(0, keySize());
-        else
-            return KEY_OFFSET_V1 + Math.max(0, keySize());
-    }
-
-    /**
-     * The length of the value in bytes
-     * @return the size in bytes of the value (0 if the value is null)
-     */
-    public int valueSize() {
-        return buffer.getInt(valueSizeOffset());
-    }
+    boolean hasValue();
 
     /**
-     * Check whether the value field of this record is null.
-     * @return true if the value is null, false otherwise
+     * Get the record's value
+     * @return the (nullable) value
      */
-    public boolean hasNullValue() {
-        return valueSize() < 0;
-    }
+    ByteBuffer value();
 
     /**
-     * The magic value (i.e. message format version) of this record
-     * @return the magic value
-     */
-    public byte magic() {
-        return buffer.get(MAGIC_OFFSET);
-    }
-
-    /**
-     * The attributes stored with this record
-     * @return the attributes
-     */
-    public byte attributes() {
-        return buffer.get(ATTRIBUTES_OFFSET);
-    }
-
-    /**
-     * When magic value is greater than 0, the timestamp of a record is determined in the following way:
-     * 1. wrapperRecordTimestampType = null and wrapperRecordTimestamp is null - Uncompressed message, timestamp is in the message.
-     * 2. wrapperRecordTimestampType = LOG_APPEND_TIME and WrapperRecordTimestamp is not null - Compressed message using LOG_APPEND_TIME
-     * 3. wrapperRecordTimestampType = CREATE_TIME and wrapperRecordTimestamp is not null - Compressed message using CREATE_TIME
+     * Check whether the record has a particular magic. For versions prior to 2, the record contains its own magic,
+     * so this function can be used to check whether it matches a particular value. For version 2 and above, this
+     * method returns true if the passed magic is greater than or equal to 2.
      *
-     * @return the timestamp as determined above
+     * @param magic the magic value to check
+     * @return true if the record has a magic field (versions prior to 2) and the value matches
      */
-    public long timestamp() {
-        if (magic() == MAGIC_VALUE_V0)
-            return NO_TIMESTAMP;
-        else {
-            // case 2
-            if (wrapperRecordTimestampType == TimestampType.LOG_APPEND_TIME && wrapperRecordTimestamp != null)
-                return wrapperRecordTimestamp;
-            // Case 1, 3
-            else
-                return buffer.getLong(TIMESTAMP_OFFSET);
-        }
-    }
+    boolean hasMagic(byte magic);
 
     /**
-     * Get the timestamp type of the record.
-     *
-     * @return The timestamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic is 0.
-     */
-    public TimestampType timestampType() {
-        if (magic() == 0)
-            return TimestampType.NO_TIMESTAMP_TYPE;
-        else
-            return wrapperRecordTimestampType == null ? TimestampType.forAttributes(attributes()) : wrapperRecordTimestampType;
-    }
-
-    /**
-     * The compression type used with this record
+     * For versions prior to 2, check whether the record is compressed (and therefore
+     * has nested record content). For versions 2 and above, this always returns false.
+     * @return true if the magic is lower than 2 and the record is compressed
      */
-    public CompressionType compressionType() {
-        return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
-    }
-
-    /**
-     * A ByteBuffer containing the value of this record
-     * @return the value or null if the value for this record is null
-     */
-    public ByteBuffer value() {
-        return Utils.sizeDelimited(buffer, valueSizeOffset());
-    }
-
-    /**
-     * A ByteBuffer containing the message key
-     * @return the buffer or null if the key for this record is null
-     */
-    public ByteBuffer key() {
-        if (magic() == MAGIC_VALUE_V0)
-            return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V0);
-        else
-            return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V1);
-    }
-
-    /**
-     * Get the underlying buffer backing this record instance.
-     *
-     * @return the buffer
-     */
-    public ByteBuffer buffer() {
-        return this.buffer;
-    }
-
-    public String toString() {
-        if (magic() > 0)
-            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, %s = %d, key = %d bytes, value = %d bytes)",
-                                 magic(),
-                                 attributes(),
-                                 compressionType(),
-                                 checksum(),
-                                 timestampType(),
-                                 timestamp(),
-                                 key() == null ? 0 : key().limit(),
-                                 value() == null ? 0 : value().limit());
-        else
-            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
-                                 magic(),
-                                 attributes(),
-                                 compressionType(),
-                                 checksum(),
-                                 key() == null ? 0 : key().limit(),
-                                 value() == null ? 0 : value().limit());
-    }
-
-    public boolean equals(Object other) {
-        if (this == other)
-            return true;
-        if (other == null)
-            return false;
-        if (!other.getClass().equals(Record.class))
-            return false;
-        Record record = (Record) other;
-        return this.buffer.equals(record.buffer);
-    }
-
-    public int hashCode() {
-        return buffer.hashCode();
-    }
+    boolean isCompressed();
 
     /**
-     * Get the size of this record if converted to the given format.
+     * For versions prior to 2, the record contained a timestamp type attribute. This method can be
+     * used to check whether the value of that attribute matches a particular timestamp type. For versions
+     * 2 and above, this will always be false.
      *
-     * @param toMagic The target magic version to convert to
-     * @return The size in bytes after conversion
+     * @param timestampType the timestamp type to compare
+     * @return true if the version is lower than 2 and the timestamp type matches
      */
-    public int convertedSize(byte toMagic) {
-        return recordSize(toMagic, Math.max(0, keySize()), Math.max(0, valueSize()));
-    }
+    boolean hasTimestampType(TimestampType timestampType);
 
     /**
-     * Convert this record to another message format.
+     * Check whether this is a control record (i.e. whether the control bit is set in the record attributes).
+     * For magic versions prior to 2, this is always false.
      *
-     * @param toMagic The target magic version to convert to
-     * @param upconvertTimestampType The timestamp type to use if up-converting from magic 0, ignored if
-     *                               down-converting or if no conversion is needed
-     * @return A new record instance with a freshly allocated ByteBuffer.
+     * @return Whether this is a control record
      */
-    public Record convert(byte toMagic, TimestampType upconvertTimestampType) {
-        byte magic = magic();
-        if (toMagic == magic)
-            return this;
+    boolean isControlRecord();
 
-        final TimestampType timestampType;
-        if (magic == Record.MAGIC_VALUE_V0) {
-            if (upconvertTimestampType == TimestampType.NO_TIMESTAMP_TYPE)
-                throw new IllegalArgumentException("Cannot up-convert using timestamp type " + upconvertTimestampType);
-            timestampType = upconvertTimestampType;
-        } else {
-            timestampType = timestampType();
-        }
-
-        ByteBuffer buffer = ByteBuffer.allocate(convertedSize(toMagic));
-        convertTo(buffer, toMagic, timestamp(), timestampType);
-        buffer.rewind();
-        return new Record(buffer);
-    }
-
-    private void convertTo(ByteBuffer buffer, byte toMagic, long timestamp, TimestampType timestampType) {
-        if (compressionType() != CompressionType.NONE)
-            throw new IllegalArgumentException("Cannot use convertTo for deep conversion");
-
-        write(buffer, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType);
-    }
-
-    /**
-     * Convert this record to another message format and write the converted data to the provided outputs stream.
-     *
-     * @param out The output stream to write the converted data to
-     * @param toMagic The target magic version for conversion
-     * @param timestamp The timestamp to use in the converted record (for up-conversion)
-     * @param timestampType The timestamp type to use in the converted record (for up-conversion)
-     * @throws IOException for any IO errors writing the converted record.
-     */
-    public void convertTo(DataOutputStream out, byte toMagic, long timestamp, TimestampType timestampType) throws IOException {
-        if (compressionType() != CompressionType.NONE)
-            throw new IllegalArgumentException("Cannot use convertTo for deep conversion");
-
-        write(out, toMagic, timestamp, key(), value(), CompressionType.NONE, timestampType);
-    }
-
-    /**
-     * Create a new record instance. If the record's compression type is not none, then
-     * its value payload should be already compressed with the specified type; the constructor
-     * would always write the value payload as is and will not do the compression itself.
-     *
-     * @param magic The magic value to use
-     * @param timestamp The timestamp of the record
-     * @param key The key of the record (null, if none)
-     * @param value The record value
-     * @param compressionType The compression type used on the contents of the record (if any)
-     * @param timestampType The timestamp type to be used for this record
-     */
-    public static Record create(byte magic,
-                                long timestamp,
-                                byte[] key,
-                                byte[] value,
-                                CompressionType compressionType,
-                                TimestampType timestampType) {
-        int keySize = key == null ? 0 : key.length;
-        int valueSize = value == null ? 0 : value.length;
-        ByteBuffer buffer = ByteBuffer.allocate(recordSize(magic, keySize, valueSize));
-        write(buffer, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
-        buffer.rewind();
-        return new Record(buffer);
-    }
-
-    public static Record create(long timestamp, byte[] key, byte[] value) {
-        return create(CURRENT_MAGIC_VALUE, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME);
-    }
-
-    public static Record create(byte magic, long timestamp, byte[] key, byte[] value) {
-        return create(magic, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME);
-    }
-
-    public static Record create(byte magic, TimestampType timestampType, long timestamp, byte[] key, byte[] value) {
-        return create(magic, timestamp, key, value, CompressionType.NONE, timestampType);
-    }
-
-    public static Record create(byte magic, long timestamp, byte[] value) {
-        return create(magic, timestamp, null, value, CompressionType.NONE, TimestampType.CREATE_TIME);
-    }
-
-    public static Record create(byte magic, byte[] key, byte[] value) {
-        return create(magic, NO_TIMESTAMP, key, value);
-    }
-
-    public static Record create(byte[] key, byte[] value) {
-        return create(NO_TIMESTAMP, key, value);
-    }
-
-    public static Record create(byte[] value) {
-        return create(CURRENT_MAGIC_VALUE, NO_TIMESTAMP, null, value, CompressionType.NONE, TimestampType.CREATE_TIME);
-    }
-
-    /**
-     * Write the header for a compressed record set in-place (i.e. assuming the compressed record data has already
-     * been written at the value offset in a wrapped record). This lets you dynamically create a compressed message
-     * set, and then go back later and fill in its size and CRC, which saves the need for copying to another buffer.
-     *
-     * @param buffer The buffer containing the compressed record data positioned at the first offset of the
-     * @param magic The magic value of the record set
-     * @param recordSize The size of the record (including record overhead)
-     * @param timestamp The timestamp of the wrapper record
-     * @param compressionType The compression type used
-     * @param timestampType The timestamp type of the wrapper record
-     */
-    public static void writeCompressedRecordHeader(ByteBuffer buffer,
-                                                   byte magic,
-                                                   int recordSize,
-                                                   long timestamp,
-                                                   CompressionType compressionType,
-                                                   TimestampType timestampType) {
-        int recordPosition = buffer.position();
-        int valueSize = recordSize - recordOverhead(magic);
-
-        // write the record header with a null value (the key is always null for the wrapper)
-        write(buffer, magic, timestamp, null, null, compressionType, timestampType);
-
-        // now fill in the value size
-        buffer.putInt(recordPosition + keyOffset(magic), valueSize);
-
-        // compute and fill the crc from the beginning of the message
-        long crc = Utils.computeChecksum(buffer, recordPosition + MAGIC_OFFSET, recordSize - MAGIC_OFFSET);
-        ByteUtils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
-    }
-
-    private static void write(ByteBuffer buffer,
-                              byte magic,
-                              long timestamp,
-                              ByteBuffer key,
-                              ByteBuffer value,
-                              CompressionType compressionType,
-                              TimestampType timestampType) {
-        try {
-            DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
-            write(out, magic, timestamp, key, value, compressionType, timestampType);
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
 
     /**
-     * Write the record data with the given compression type and return the computed crc.
+     * Get the headers. For magic versions 1 and below, this always returns an empty array.
      *
-     * @param out The output stream to write to
-     * @param magic The magic value to be used
-     * @param timestamp The timestamp of the record
-     * @param key The record key
-     * @param value The record value
-     * @param compressionType The compression type
-     * @param timestampType The timestamp type
-     * @return the computed CRC for this record.
-     * @throws IOException for any IO errors writing to the output stream.
-     */
-    public static long write(DataOutputStream out,
-                             byte magic,
-                             long timestamp,
-                             byte[] key,
-                             byte[] value,
-                             CompressionType compressionType,
-                             TimestampType timestampType) throws IOException {
-        return write(out, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
-    }
-
-    private static long write(DataOutputStream out,
-                              byte magic,
-                              long timestamp,
-                              ByteBuffer key,
-                              ByteBuffer value,
-                              CompressionType compressionType,
-                              TimestampType timestampType) throws IOException {
-        byte attributes = computeAttributes(magic, compressionType, timestampType);
-        long crc = computeChecksum(magic, attributes, timestamp, key, value);
-        write(out, magic, crc, attributes, timestamp, key, value);
-        return crc;
-    }
-
-    /**
-     * Write a record using raw fields (without validation). This should only be used in testing.
+     * @return the array of headers
      */
-    public static void write(DataOutputStream out,
-                             byte magic,
-                             long crc,
-                             byte attributes,
-                             long timestamp,
-                             byte[] key,
-                             byte[] value) throws IOException {
-        write(out, magic, crc, attributes, timestamp, wrapNullable(key), wrapNullable(value));
-    }
-
-    // Write a record to the buffer, if the record's compression type is none, then
-    // its value payload should be already compressed with the specified type
-    private static void write(DataOutputStream out,
-                              byte magic,
-                              long crc,
-                              byte attributes,
-                              long timestamp,
-                              ByteBuffer key,
-                              ByteBuffer value) throws IOException {
-        if (magic != MAGIC_VALUE_V0 && magic != MAGIC_VALUE_V1)
-            throw new IllegalArgumentException("Invalid magic value " + magic);
-        if (timestamp < 0 && timestamp != NO_TIMESTAMP)
-            throw new IllegalArgumentException("Invalid message timestamp " + timestamp);
-
-        // write crc
-        out.writeInt((int) (crc & 0xffffffffL));
-        // write magic value
-        out.writeByte(magic);
-        // write attributes
-        out.writeByte(attributes);
-
-        // maybe write timestamp
-        if (magic > 0)
-            out.writeLong(timestamp);
-
-        // write the key
-        if (key == null) {
-            out.writeInt(-1);
-        } else {
-            int size = key.remaining();
-            out.writeInt(size);
-            out.write(key.array(), key.arrayOffset(), size);
-        }
-        // write the value
-        if (value == null) {
-            out.writeInt(-1);
-        } else {
-            int size = value.remaining();
-            out.writeInt(size);
-            out.write(value.array(), value.arrayOffset(), size);
-        }
-    }
-
-    public static int recordSize(byte[] key, byte[] value) {
-        return recordSize(CURRENT_MAGIC_VALUE, key, value);
-    }
-
-    public static int recordSize(byte magic, byte[] key, byte[] value) {
-        return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length);
-    }
-
-    private static int recordSize(byte magic, int keySize, int valueSize) {
-        return recordOverhead(magic) + keySize + valueSize;
-    }
-
-    // visible only for testing
-    public static byte computeAttributes(byte magic, CompressionType type, TimestampType timestampType) {
-        byte attributes = 0;
-        if (type.id > 0)
-            attributes = (byte) (attributes | (COMPRESSION_CODEC_MASK & type.id));
-        if (magic > 0)
-            return timestampType.updateAttributes(attributes);
-        return attributes;
-    }
-
-    // visible only for testing
-    public static long computeChecksum(byte magic, byte attributes, long timestamp, byte[] key, byte[] value) {
-        return computeChecksum(magic, attributes, timestamp, wrapNullable(key), wrapNullable(value));
-    }
-
-    /**
-     * Compute the checksum of the record from the attributes, key and value payloads
-     */
-    private static long computeChecksum(byte magic, byte attributes, long timestamp, ByteBuffer key, ByteBuffer value) {
-        Crc32 crc = new Crc32();
-        crc.update(magic);
-        crc.update(attributes);
-        if (magic > 0)
-            crc.updateLong(timestamp);
-        // update for the key
-        if (key == null) {
-            crc.updateInt(-1);
-        } else {
-            int size = key.remaining();
-            crc.updateInt(size);
-            crc.update(key.array(), key.arrayOffset(), size);
-        }
-        // update for the value
-        if (value == null) {
-            crc.updateInt(-1);
-        } else {
-            int size = value.remaining();
-            crc.updateInt(size);
-            crc.update(value.array(), value.arrayOffset(), size);
-        }
-        return crc.getValue();
-    }
-
-    public static int recordOverhead(byte magic) {
-        if (magic == 0)
-            return RECORD_OVERHEAD_V0;
-        return RECORD_OVERHEAD_V1;
-    }
-
-    private static int keyOffset(byte magic) {
-        if (magic == 0)
-            return KEY_OFFSET_V0;
-        return KEY_OFFSET_V1;
-    }
-
+    Header[] headers();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
new file mode 100644
index 0000000..1cfb7f8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A record batch is a container for records. In old versions of the record format (versions 0 and 1),
+ * a batch consisted always of a single record if no compression was enabled, but could contain
+ * many records otherwise. Newer versions (magic versions 2 and above) will generally contain many records
+ * regardless of compression.
+ */
+public interface RecordBatch extends Iterable<Record> {
+
+    /**
+     * The "magic" values
+     */
+    byte MAGIC_VALUE_V0 = 0;
+    byte MAGIC_VALUE_V1 = 1;
+    byte MAGIC_VALUE_V2 = 2;
+
+    /**
+     * The current "magic" value
+     */
+    byte CURRENT_MAGIC_VALUE = MAGIC_VALUE_V2;
+
+    /**
+     * Timestamp value for records without a timestamp
+     */
+    long NO_TIMESTAMP = -1L;
+
+    /**
+     * Values used in the v2 record format by non-idempotent/non-transactional producers or when
+     * up-converting from an older format.
+     */
+    long NO_PRODUCER_ID = -1L;
+    short NO_PRODUCER_EPOCH = -1;
+    int NO_SEQUENCE = -1;
+
+    /**
+     * Used to indicate an unknown leader epoch, which will be the case when the record set is
+     * first created by the producer.
+     */
+    int UNKNOWN_PARTITION_LEADER_EPOCH = -1;
+
+    /**
+     * Check whether the checksum of this batch is correct.
+     *
+     * @return true If so, false otherwise
+     */
+    boolean isValid();
+
+    /**
+     * Raise an exception if the checksum is not valid.
+     */
+    void ensureValid();
+
+    /**
+     * Get the checksum of this record batch, which covers the batch header as well as all of the records.
+     *
+     * @return The 4-byte unsigned checksum represented as a long
+     */
+    long checksum();
+
+    /**
+     * Get the timestamp of this record batch. This is the max timestamp among all records contained in this batch.
+     * This value is updated during compaction.
+     *
+     * @return The max timestamp
+     */
+    long maxTimestamp();
+
+    /**
+     * Get the timestamp type of this record batch. This will be {@link TimestampType#NO_TIMESTAMP_TYPE}
+     * if the batch has magic 0.
+     *
+     * @return The timestamp type
+     */
+    TimestampType timestampType();
+
+    /**
+     * Get the first offset contained in this record batch. For magic version prior to 2, this generally
+     * requires deep iteration and will return the offset of the first record in the record batch. For
+     * magic version 2 and above, this will return the first offset of the original record batch (i.e.
+     * prior to compaction). For non-compacted topics, the behavior is equivalent.
+     *
+     * Because this requires deep iteration for older magic versions, this method should be used with
+     * caution. Generally {@link #lastOffset()} is safer since access is efficient for all magic versions.
+     *
+     * @return The base offset of this record batch (which may or may not be the offset of the first record
+     *         as described above).
+     */
+    long baseOffset();
+
+    /**
+     * Get the last offset in this record batch (inclusive). Unlike {@link #baseOffset()}, the last offset
+     * always reflects the offset of the last record in the batch, even after compaction.
+     *
+     * @return The offset of the last record in this batch
+     */
+    long lastOffset();
+
+    /**
+     * Get the offset following this record batch (i.e. the last offset contained in this batch plus one).
+     *
+     * @return the next consecutive offset following this batch
+     */
+    long nextOffset();
+
+    /**
+     * Get the record format version of this record batch (i.e its magic value).
+     *
+     * @return the magic byte
+     */
+    byte magic();
+
+    /**
+     * Get the producer ID (PID) for this log record batch. For older magic versions, this will return 0.
+     *
+     * @return The PID or -1 if there is none
+     */
+    long producerId();
+
+    /**
+     * Get the producer epoch for this log record batch.
+     *
+     * @return The producer epoch, or -1 if there is none
+     */
+    short producerEpoch();
+
+    /**
+     * Get the first sequence number of this record batch.
+     * @return The first sequence number or -1 if there is none
+     */
+    int baseSequence();
+
+    /**
+     * Get the last sequence number of this record batch.
+     *
+     * @return The last sequence number or -1 if there is none
+     */
+    int lastSequence();
+
+    /**
+     * Get the compression type of this record batch.
+     *
+     * @return The compression type
+     */
+    CompressionType compressionType();
+
+    /**
+     * Get the size in bytes of this batch, including the size of the record and the batch overhead.
+     * @return The size in bytes of this batch
+     */
+    int sizeInBytes();
+
+    /**
+     * Get the count if it is efficiently supported by the record format (which is only the case
+     * for magic 2 and higher).
+     *
+     * @return The number of records in the batch or null for magic versions 0 and 1.
+     */
+    Integer countOrNull();
+
+    /**
+     * Check whether this record batch is compressed.
+     * @return true if so, false otherwise
+     */
+    boolean isCompressed();
+
+    /**
+     * Write this record batch into a buffer.
+     * @param buffer The buffer to write the batch to
+     */
+    void writeTo(ByteBuffer buffer);
+
+    /**
+     * Whether or not this record batch is part of a transaction.
+     * @return true if it is, false otherwise
+     */
+    boolean isTransactional();
+
+    /**
+     * Get the partition leader epoch of this record batch.
+     * @return The leader epoch or -1 if it is unknown
+     */
+    int partitionLeaderEpoch();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
new file mode 100644
index 0000000..88af039
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatchIterator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.AbstractIterator;
+
+import java.io.IOException;
+
+class RecordBatchIterator<T extends RecordBatch> extends AbstractIterator<T> {
+
+    private final LogInputStream<T> logInputStream;
+
+    RecordBatchIterator(LogInputStream<T> logInputStream) {
+        this.logInputStream = logInputStream;
+    }
+
+    @Override
+    protected T makeNext() {
+        try {
+            T batch = logInputStream.nextBatch();
+            if (batch == null)
+                return allDone();
+            return batch;
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 5e0490d..6a4d1a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -20,13 +20,22 @@ import java.io.IOException;
 import java.nio.channels.GatheringByteChannel;
 
 /**
- * Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
- * Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
- * If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
- * compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
- * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries()}. Note
- * that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
- * shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
+ * Interface for accessing the records contained in a log. The log itself is represented as a sequence of record
+ * batches (see {@link RecordBatch}).
+ *
+ * For magic versions 1 and below, each batch consists of an 8 byte offset, a 4 byte record size, and a "shallow"
+ * {@link Record record}. If the batch is not compressed, then each batch will have only the shallow record contained
+ * inside it. If it is compressed, the batch contains "deep" records, which are packed into the value field of the
+ * shallow record. To iterate over the shallow batches, use {@link #batches()}; for the deep records, use
+ * {@link #records()}. Note that the deep iterator handles both compressed and non-compressed batches: if the batch is
+ * not compressed, the shallow record is returned; otherwise, the shallow batch is decompressed and the deep records
+ * are returned.
+ *
+ * For magic version 2, every batch contains 1 or more log record, regardless of compression. You can iterate
+ * over the batches directly using {@link #batches()}. Records can be iterated either directly from an individual
+ * batch or through {@link #records()}. Just as in previous versions, iterating over the records typically involves
+ * decompression and should therefore be used with caution.
+ *
  * See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
  */
 public interface Records {
@@ -37,6 +46,10 @@ public interface Records {
     int SIZE_LENGTH = 4;
     int LOG_OVERHEAD = SIZE_OFFSET + SIZE_LENGTH;
 
+    // the magic offset is at the same offset for all current message formats, but the 4 bytes
+    // between the size and the magic is dependent on the version.
+    int MAGIC_OFFSET = 16;
+
     /**
      * The size of these records in bytes.
      * @return The size in bytes of the records
@@ -54,38 +67,41 @@ public interface Records {
     long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;
 
     /**
-     * Get the shallow log entries in this log buffer. Note that the signature allows subclasses
-     * to return a more specific log entry type. This enables optimizations such as in-place offset
-     * assignment (see {@link ByteBufferLogInputStream.ByteBufferLogEntry}), and partial reading of
-     * record data (see {@link FileLogInputStream.FileChannelLogEntry#magic()}.
-     * @return An iterator over the shallow entries of the log
+     * Get the record batches. Note that the signature allows subclasses
+     * to return a more specific batch type. This enables optimizations such as in-place offset
+     * assignment (see for example {@link DefaultRecordBatch}), and partial reading of
+     * record data (see {@link FileLogInputStream.FileChannelRecordBatch#magic()}.
+     * @return An iterator over the record batches of the log
      */
-    Iterable<? extends LogEntry> shallowEntries();
+    Iterable<? extends RecordBatch> batches();
 
     /**
-     * Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
-     * there are fewer options for optimization since the data must be decompressed before it can be
-     * returned. Hence there is little advantage in allowing subclasses to return a more specific type
-     * as we do for {@link #shallowEntries()}.
-     * @return An iterator over the deep entries of the log
+     * Check whether all batches in this buffer have a certain magic value.
+     * @param magic The magic value to check
+     * @return true if all record batches have a matching magic value, false otherwise
      */
-    Iterable<LogEntry> deepEntries();
+    boolean hasMatchingMagic(byte magic);
 
     /**
-     * Check whether all shallow entries in this buffer have a certain magic value.
-     * @param magic The magic value to check
-     * @return true if all shallow entries have a matching magic value, false otherwise
+     * Check whether this log buffer has a magic value compatible with a particular value
+     * (i.e. whether all message sets contained in the buffer have a matching or lower magic).
+     * @param magic The magic version to ensure compatibility with
+     * @return true if all batches have compatible magic, false otherwise
      */
-    boolean hasMatchingShallowMagic(byte magic);
-
+    boolean hasCompatibleMagic(byte magic);
 
     /**
-     * Convert all entries in this buffer to the format passed as a parameter. Note that this requires
+     * Convert all batches in this buffer to the format passed as a parameter. Note that this requires
      * deep iteration since all of the deep records must also be converted to the desired format.
      * @param toMagic The magic value to convert to
-     * @param upconvertTimestampType The timestamp type to use if up-converting from magic 0
-     * @return A Records (which may or may not be the same instance)
+     * @return A Records instance (which may or may not be the same instance)
      */
-    Records toMessageFormat(byte toMagic, TimestampType upconvertTimestampType);
+    Records downConvert(byte toMagic);
 
+    /**
+     * Get an iterator over the records in this log. Note that this generally requires decompression,
+     * and should therefore be used with care.
+     * @return The record iterator
+     */
+    Iterable<Record> records();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
deleted file mode 100644
index 710ce3b..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
-import org.apache.kafka.common.utils.Utils;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-
-/**
- * An iterator which handles both the shallow and deep iteration of record sets.
- */
-public class RecordsIterator extends AbstractIterator<LogEntry> {
-    private final boolean shallow;
-    private final boolean ensureMatchingMagic;
-    private final int maxRecordSize;
-    private final ShallowRecordsIterator<?> shallowIter;
-    private DeepRecordsIterator innerIter;
-
-    public RecordsIterator(LogInputStream<?> logInputStream,
-                           boolean shallow,
-                           boolean ensureMatchingMagic,
-                           int maxRecordSize) {
-        this.shallowIter = new ShallowRecordsIterator<>(logInputStream);
-        this.shallow = shallow;
-        this.ensureMatchingMagic = ensureMatchingMagic;
-        this.maxRecordSize = maxRecordSize;
-    }
-
-    /**
-     * Get a shallow iterator over the given input stream.
-     * @param logInputStream The log input stream to read the entries from
-     * @param <T> The type of the log entry
-     * @return The shallow iterator.
-     */
-    public static <T extends LogEntry> Iterator<T> shallowIterator(LogInputStream<T> logInputStream) {
-        return new ShallowRecordsIterator<>(logInputStream);
-    }
-
-    @Override
-    protected LogEntry makeNext() {
-        if (innerDone()) {
-            if (!shallowIter.hasNext())
-                return allDone();
-
-            LogEntry entry = shallowIter.next();
-
-            // decide whether to go shallow or deep iteration if it is compressed
-            if (shallow || !entry.isCompressed()) {
-                return entry;
-            } else {
-                // init the inner iterator with the value payload of the message,
-                // which will de-compress the payload to a set of messages;
-                // since we assume nested compression is not allowed, the deep iterator
-                // would not try to further decompress underlying messages
-                // There will be at least one element in the inner iterator, so we don't
-                // need to call hasNext() here.
-                innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize);
-                return innerIter.next();
-            }
-        } else {
-            return innerIter.next();
-        }
-    }
-
-    private boolean innerDone() {
-        return innerIter == null || !innerIter.hasNext();
-    }
-
-    private static final class DataLogInputStream implements LogInputStream<LogEntry> {
-        private final DataInputStream stream;
-        protected final int maxMessageSize;
-
-        DataLogInputStream(DataInputStream stream, int maxMessageSize) {
-            this.stream = stream;
-            this.maxMessageSize = maxMessageSize;
-        }
-
-        public LogEntry nextEntry() throws IOException {
-            try {
-                long offset = stream.readLong();
-                int size = stream.readInt();
-                if (size < Record.RECORD_OVERHEAD_V0)
-                    throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
-                if (size > maxMessageSize)
-                    throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
-
-                byte[] recordBuffer = new byte[size];
-                stream.readFully(recordBuffer, 0, size);
-                ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
-                return LogEntry.create(offset, new Record(buf));
-            } catch (EOFException e) {
-                return null;
-            }
-        }
-    }
-
-    private static class ShallowRecordsIterator<T extends LogEntry> extends AbstractIterator<T> {
-        private final LogInputStream<T> logStream;
-
-        public ShallowRecordsIterator(LogInputStream<T> logStream) {
-            this.logStream = logStream;
-        }
-
-        @Override
-        protected T makeNext() {
-            try {
-                T entry = logStream.nextEntry();
-                if (entry == null)
-                    return allDone();
-                return entry;
-            } catch (IOException e) {
-                throw new KafkaException(e);
-            }
-        }
-    }
-
-    public static class DeepRecordsIterator extends AbstractIterator<LogEntry> {
-        private final ArrayDeque<LogEntry> logEntries;
-        private final long absoluteBaseOffset;
-        private final byte wrapperMagic;
-
-        public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
-            Record wrapperRecord = wrapperEntry.record();
-            this.wrapperMagic = wrapperRecord.magic();
-
-            CompressionType compressionType = wrapperRecord.compressionType();
-            ByteBuffer wrapperValue = wrapperRecord.value();
-            if (wrapperValue == null)
-                throw new InvalidRecordException("Found invalid compressed record set with null value");
-
-            DataInputStream stream = new DataInputStream(compressionType.wrapForInput(new ByteBufferInputStream(wrapperValue),
-                    wrapperRecord.magic()));
-            LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize);
-
-            long wrapperRecordOffset = wrapperEntry.offset();
-            long wrapperRecordTimestamp = wrapperRecord.timestamp();
-            this.logEntries = new ArrayDeque<>();
-
-            // If relative offset is used, we need to decompress the entire message first to compute
-            // the absolute offset. For simplicity and because it's a format that is on its way out, we
-            // do the same for message format version 0
-            try {
-                while (true) {
-                    LogEntry logEntry = logStream.nextEntry();
-                    if (logEntry == null)
-                        break;
-
-                    Record record = logEntry.record();
-                    byte magic = record.magic();
-
-                    if (ensureMatchingMagic && magic != wrapperMagic)
-                        throw new InvalidRecordException("Compressed message magic does not match wrapper magic");
-
-                    if (magic > Record.MAGIC_VALUE_V0) {
-                        Record recordWithTimestamp = new Record(
-                                record.buffer(),
-                                wrapperRecordTimestamp,
-                                wrapperRecord.timestampType()
-                        );
-                        logEntry = LogEntry.create(logEntry.offset(), recordWithTimestamp);
-                    }
-                    logEntries.addLast(logEntry);
-                }
-
-                if (logEntries.isEmpty())
-                    throw new InvalidRecordException("Found invalid compressed record set with no inner records");
-
-                if (wrapperMagic > Record.MAGIC_VALUE_V0)
-                    this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
-                else
-                    this.absoluteBaseOffset = -1;
-            } catch (IOException e) {
-                throw new KafkaException(e);
-            } finally {
-                Utils.closeQuietly(stream, "records iterator stream");
-            }
-        }
-
-        @Override
-        protected LogEntry makeNext() {
-            if (logEntries.isEmpty())
-                return allDone();
-
-            LogEntry entry = logEntries.remove();
-
-            // Convert offset to absolute offset if needed.
-            if (absoluteBaseOffset >= 0) {
-                long absoluteOffset = absoluteBaseOffset + entry.offset();
-                entry = LogEntry.create(absoluteOffset, entry.record());
-            }
-
-            if (entry.isCompressed())
-                throw new InvalidRecordException("Inner messages must not be compressed");
-
-            return entry;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
new file mode 100644
index 0000000..3a1c04c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/SimpleRecord.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * High-level representation of a kafka record. This is useful when building record sets to
+ * avoid depending on a specific magic version.
+ */
+public class SimpleRecord {
+    private final ByteBuffer key;
+    private final ByteBuffer value;
+    private final long timestamp;
+    private final Header[] headers;
+
+    public SimpleRecord(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
+        Objects.requireNonNull(headers, "Headers must be non-null");
+        this.key = key;
+        this.value = value;
+        this.timestamp = timestamp;
+        this.headers = headers;
+    }
+
+    public SimpleRecord(long timestamp, byte[] key, byte[] value, Header[] headers) {
+        this(timestamp, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
+    }
+
+    public SimpleRecord(long timestamp, ByteBuffer key, ByteBuffer value) {
+        this(timestamp, key, value, Record.EMPTY_HEADERS);
+    }
+
+    public SimpleRecord(long timestamp, byte[] key, byte[] value) {
+        this(timestamp, Utils.wrapNullable(key), Utils.wrapNullable(value));
+    }
+
+    public SimpleRecord(long timestamp, byte[] value) {
+        this(timestamp, null, value);
+    }
+
+    public SimpleRecord(byte[] value) {
+        this(RecordBatch.NO_TIMESTAMP, null, value);
+    }
+
+    public SimpleRecord(byte[] key, byte[] value) {
+        this(RecordBatch.NO_TIMESTAMP, key, value);
+    }
+
+    public SimpleRecord(Record record) {
+        this(record.timestamp(), record.key(), record.value(), record.headers());
+    }
+
+    public ByteBuffer key() {
+        return key;
+    }
+
+    public ByteBuffer value() {
+        return value;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public Header[] headers() {
+        return headers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        SimpleRecord that = (SimpleRecord) o;
+        return timestamp == that.timestamp &&
+                (key == null ? that.key == null : key.equals(that.key)) &&
+                (value == null ? that.value == null : value.equals(that.value)) &&
+                Arrays.equals(headers, that.headers);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key != null ? key.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+        result = 31 * result + Arrays.hashCode(headers);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
index 9440668..becde9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
@@ -32,19 +32,6 @@ public enum TimestampType {
         this.name = name;
     }
 
-    public byte updateAttributes(byte attributes) {
-        if (this == NO_TIMESTAMP_TYPE)
-            throw new IllegalArgumentException("Cannot use NO_TIMESTAMP_TYPE in attributes");
-
-        return this == CREATE_TIME ?
-            (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes | Record.TIMESTAMP_TYPE_MASK);
-    }
-
-    public static TimestampType forAttributes(byte attributes) {
-        int timestampType = (attributes & Record.TIMESTAMP_TYPE_MASK) >> Record.TIMESTAMP_TYPE_ATTRIBUTE_OFFSET;
-        return timestampType == 0 ? CREATE_TIME : LOG_APPEND_TIME;
-    }
-
     public static TimestampType forName(String name) {
         for (TimestampType t : values())
             if (t.name.equals(name))

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 7cfc54f..7dc3b62 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -46,6 +46,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             return desiredVersion == null ? apiKey.latestVersion() : desiredVersion;
         }
 
+        public Short desiredVersion() {
+            return desiredVersion;
+        }
+
         public T build() {
             return build(desiredOrLatestVersion());
         }


Mime
View raw message