kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [09/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:44:02 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 0a333c5..b0dcebf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.network.TransportLayer;
-import org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.Closeable;
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -41,14 +42,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
     private final int start;
     private final int end;
 
-    private final Iterable<FileChannelLogEntry> shallowEntries;
-
-    private final Iterable<LogEntry> deepEntries = new Iterable<LogEntry>() {
-        @Override
-        public Iterator<LogEntry> iterator() {
-            return deepIterator();
-        }
-    };
+    private final Iterable<FileLogInputStream.FileChannelRecordBatch> batches;
 
     // mutable state
     private final AtomicInteger size;
@@ -83,7 +77,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
             channel.position(limit);
         }
 
-        shallowEntries = shallowEntriesFrom(start);
+        batches = batchesFrom(start);
     }
 
     @Override
@@ -108,10 +102,10 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     /**
-     * Read log entries into the given buffer until there are no bytes remaining in the buffer or the end of the file
+     * Read log batches into the given buffer until there are no bytes remaining in the buffer or the end of the file
      * is reached.
      *
-     * @param buffer The buffer to write the entries to
+     * @param buffer The buffer to write the batches to
      * @param position Position in the buffer to read from
      * @return The same buffer
      * @throws IOException If an I/O error occurs, see {@link FileChannel#read(ByteBuffer, long)} for details on the
@@ -151,7 +145,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     /**
-     * Append log entries to the buffer
+     * Append log batches to the buffer
      * @param records The records to append
      * @return the number of bytes written to the underlying file
      */
@@ -237,6 +231,23 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     @Override
+    public Records downConvert(byte toMagic) {
+        List<? extends RecordBatch> batches = Utils.toList(batches().iterator());
+        if (batches.isEmpty()) {
+            // This indicates that the message is too large, which means that the buffer is not large
+            // enough to hold a full record batch. We just return all the bytes in the file message set.
+            // Even though the message set does not have the right format version, we expect old clients
+            // to raise an error to the user after reading the message size and seeing that there
+            // are not enough available bytes in the response to read the full message. Note that this is
+            // only possible prior to KIP-74, after which the broker was changed to always return at least
+            // one full message, even if it requires exceeding the max fetch size requested by the client.
+            return this;
+        } else {
+            return downConvert(batches, toMagic);
+        }
+    }
+
+    @Override
     public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
         long newSize = Math.min(channel.size(), end) - start;
         int oldSize = sizeInBytes();
@@ -266,10 +277,10 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @param startingPosition The starting position in the file to begin searching from.
      */
     public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
-        for (FileChannelLogEntry entry : shallowEntriesFrom(startingPosition)) {
-            long offset = entry.offset();
+        for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
+            long offset = batch.lastOffset();
             if (offset >= targetOffset)
-                return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
+                return new LogEntryPosition(offset, batch.position(), batch.sizeInBytes());
         }
         return null;
     }
@@ -282,18 +293,17 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @return The timestamp and offset of the message found. None, if no message is found.
      */
     public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
-        for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
-            Record shallowRecord = shallowEntry.record();
-            if (shallowRecord.timestamp() >= targetTimestamp) {
+        for (RecordBatch batch : batchesFrom(startingPosition)) {
+            if (batch.maxTimestamp() >= targetTimestamp) {
                 // We found a message
-                for (LogEntry deepLogEntry : shallowEntry) {
-                    long timestamp = deepLogEntry.record().timestamp();
+                for (Record record : batch) {
+                    long timestamp = record.timestamp();
                     if (timestamp >= targetTimestamp)
-                        return new TimestampAndOffset(timestamp, deepLogEntry.offset());
+                        return new TimestampAndOffset(timestamp, record.offset());
                 }
-                throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s" +
-                        " should contain target timestamp %s, but does not.", shallowRecord.timestamp(),
-                        shallowEntry.offset(), targetTimestamp));
+                throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s)" +
+                        " should contain target timestamp %s but it does not.", batch.maxTimestamp(),
+                        batch.lastOffset(), targetTimestamp));
             }
         }
         return null;
@@ -305,75 +315,60 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @return The largest timestamp of the messages after the given position.
      */
     public TimestampAndOffset largestTimestampAfter(int startingPosition) {
-        long maxTimestamp = Record.NO_TIMESTAMP;
+        long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         long offsetOfMaxTimestamp = -1L;
 
-        for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
-            long timestamp = shallowEntry.record().timestamp();
+        for (RecordBatch batch : batchesFrom(startingPosition)) {
+            long timestamp = batch.maxTimestamp();
             if (timestamp > maxTimestamp) {
                 maxTimestamp = timestamp;
-                offsetOfMaxTimestamp = shallowEntry.offset();
+                offsetOfMaxTimestamp = batch.lastOffset();
             }
         }
         return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp);
     }
 
     /**
-     * Get an iterator over the shallow entries in the file. Note that the entries are
+     * Get an iterator over the record batches in the file. Note that the batches are
      * backed by the open file channel. When the channel is closed (i.e. when this instance
-     * is closed), the entries will generally no longer be readable.
-     * @return An iterator over the shallow entries
+     * is closed), the batches will generally no longer be readable.
+     * @return An iterator over the batches
      */
     @Override
-    public Iterable<FileChannelLogEntry> shallowEntries() {
-        return shallowEntries;
+    public Iterable<FileChannelRecordBatch> batches() {
+        return batches;
     }
 
     /**
-     * Get an iterator over the shallow entries, enforcing a maximum record size
+     * Get an iterator over the record batches, enforcing a maximum record size
      * @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
-     * @return An iterator over the shallow entries
+     * @return An iterator over the batches
      */
-    public Iterable<FileChannelLogEntry> shallowEntries(int maxRecordSize) {
-        return shallowEntries(maxRecordSize, start);
+    public Iterable<FileChannelRecordBatch> batches(int maxRecordSize) {
+        return batches(maxRecordSize, start);
     }
 
-    private Iterable<FileChannelLogEntry> shallowEntriesFrom(int start) {
-        return shallowEntries(Integer.MAX_VALUE, start);
+    private Iterable<FileChannelRecordBatch> batchesFrom(int start) {
+        return batches(Integer.MAX_VALUE, start);
     }
 
-    private Iterable<FileChannelLogEntry> shallowEntries(final int maxRecordSize, final int start) {
-        return new Iterable<FileChannelLogEntry>() {
+    private Iterable<FileChannelRecordBatch> batches(final int maxRecordSize, final int start) {
+        return new Iterable<FileChannelRecordBatch>() {
             @Override
-            public Iterator<FileChannelLogEntry> iterator() {
-                return shallowIterator(maxRecordSize, start);
+            public Iterator<FileChannelRecordBatch> iterator() {
+                return batchIterator(maxRecordSize, start);
             }
         };
     }
 
-    private Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize, int start) {
+    private Iterator<FileChannelRecordBatch> batchIterator(int maxRecordSize, int start) {
         final int end;
         if (isSlice)
             end = this.end;
         else
             end = this.sizeInBytes();
         FileLogInputStream inputStream = new FileLogInputStream(channel, maxRecordSize, start, end);
-        return RecordsIterator.shallowIterator(inputStream);
-    }
-
-    @Override
-    public Iterable<LogEntry> deepEntries() {
-        return deepEntries;
-    }
-
-    private Iterator<LogEntry> deepIterator() {
-        final int end;
-        if (isSlice)
-            end = this.end;
-        else
-            end = this.sizeInBytes();
-        FileLogInputStream inputStream = new FileLogInputStream(channel, Integer.MAX_VALUE, start, end);
-        return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE);
+        return new RecordBatchIterator<>(inputStream);
     }
 
     public static FileRecords open(File file,
@@ -446,14 +441,16 @@ public class FileRecords extends AbstractRecords implements Closeable {
 
         @Override
         public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
 
             LogEntryPosition that = (LogEntryPosition) o;
 
-            if (offset != that.offset) return false;
-            if (position != that.position) return false;
-            return size == that.size;
+            return offset == that.offset &&
+                    position == that.position &&
+                    size == that.size;
 
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/Header.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Header.java b/clients/src/main/java/org/apache/kafka/common/record/Header.java
new file mode 100644
index 0000000..2ca077c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/Header.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class Header {
+    private final String key;
+    private final ByteBuffer value;
+
+    public Header(String key, ByteBuffer value) {
+        Objects.requireNonNull(key, "Null header keys are not permitted");
+        this.key = key;
+        this.value = value;
+    }
+
+    public Header(String key, byte[] value) {
+        this(key, Utils.wrapNullable(value));
+    }
+
+    public String key() {
+        return key;
+    }
+
+    public ByteBuffer value() {
+        return value == null ? null : value.duplicate();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        Header header = (Header) o;
+        return (key == null ? header.key == null : key.equals(header.key)) &&
+                (value == null ? header.value == null : value.equals(header.value));
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key != null ? key.hashCode() : 0;
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
new file mode 100644
index 0000000..39908ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.utils.Utils.wrapNullable;
+
+/**
+ * This class represents the serialized key and value along with the associated CRC and other fields
+ * of message format versions 0 and 1. Note that it is uncommon to need to access this class directly.
+ * Usually it should be accessed indirectly through the {@link Record} interface which is exposed
+ * through the {@link Records} object.
+ */
+public final class LegacyRecord {
+
+    /**
+     * The current offset and size for all the fixed-length fields
+     */
+    public static final int CRC_OFFSET = 0;
+    public static final int CRC_LENGTH = 4;
+    public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
+    public static final int MAGIC_LENGTH = 1;
+    public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+    public static final int ATTRIBUTE_LENGTH = 1;
+    public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int TIMESTAMP_LENGTH = 8;
+    public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+    public static final int KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH;
+    public static final int KEY_SIZE_LENGTH = 4;
+    public static final int KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH;
+    public static final int KEY_OFFSET_V1 = KEY_SIZE_OFFSET_V1 + KEY_SIZE_LENGTH;
+    public static final int VALUE_SIZE_LENGTH = 4;
+
+    /**
+     * The size for the record header
+     */
+    public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
+
+    /**
+     * The amount of overhead bytes in a record
+     */
+    public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+    /**
+     * The amount of overhead bytes in a record
+     */
+    public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+
+    /**
+     * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
+     * compression
+     */
+    private static final int COMPRESSION_CODEC_MASK = 0x07;
+
+    /**
+     * Specify the mask of timestamp type: 0 for CreateTime, 1 for LogAppendTime.
+     */
+    private static final byte TIMESTAMP_TYPE_MASK = 0x08;
+
+    /**
+     * Timestamp value for records without a timestamp
+     */
+    public static final long NO_TIMESTAMP = -1L;
+
+    private final ByteBuffer buffer;
+    private final Long wrapperRecordTimestamp;
+    private final TimestampType wrapperRecordTimestampType;
+
+    public LegacyRecord(ByteBuffer buffer) {
+        this(buffer, null, null);
+    }
+
+    public LegacyRecord(ByteBuffer buffer, Long wrapperRecordTimestamp, TimestampType wrapperRecordTimestampType) {
+        this.buffer = buffer;
+        this.wrapperRecordTimestamp = wrapperRecordTimestamp;
+        this.wrapperRecordTimestampType = wrapperRecordTimestampType;
+    }
+
+    /**
+     * Compute the checksum of the record from the record contents
+     */
+    public long computeChecksum() {
+        return Crc32.crc32(buffer, MAGIC_OFFSET, buffer.limit() - MAGIC_OFFSET);
+    }
+
+    /**
+     * Retrieve the previously computed CRC for this record
+     */
+    public long checksum() {
+        return ByteUtils.readUnsignedInt(buffer, CRC_OFFSET);
+    }
+
+    /**
+     * Returns true if the crc stored with the record matches the crc computed off the record contents
+     */
+    public boolean isValid() {
+        return sizeInBytes() >= RECORD_OVERHEAD_V0 && checksum() == computeChecksum();
+    }
+
+    public Long wrapperRecordTimestamp() {
+        return wrapperRecordTimestamp;
+    }
+
+    public TimestampType wrapperRecordTimestampType() {
+        return wrapperRecordTimestampType;
+    }
+
+    /**
+     * Throw an InvalidRecordException if isValid is false for this record
+     */
+    public void ensureValid() {
+        if (sizeInBytes() < RECORD_OVERHEAD_V0)
+            throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too "
+                    + "small, size = " + sizeInBytes() + ")");
+
+        if (!isValid())
+            throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum()
+                    + ", computed crc = " + computeChecksum() + ")");
+    }
+
+    /**
+     * The complete serialized size of this record in bytes (including crc, header attributes, etc), but
+     * excluding the log overhead (offset and record size).
+     * @return the size in bytes
+     */
+    public int sizeInBytes() {
+        return buffer.limit();
+    }
+
+    /**
+     * The length of the key in bytes
+     * @return the size in bytes of the key (0 if the key is null)
+     */
+    public int keySize() {
+        if (magic() == RecordBatch.MAGIC_VALUE_V0)
+            return buffer.getInt(KEY_SIZE_OFFSET_V0);
+        else
+            return buffer.getInt(KEY_SIZE_OFFSET_V1);
+    }
+
+    /**
+     * Does the record have a key?
+     * @return true if so, false otherwise
+     */
+    public boolean hasKey() {
+        return keySize() >= 0;
+    }
+
+    /**
+     * The position where the value size is stored
+     */
+    private int valueSizeOffset() {
+        if (magic() == RecordBatch.MAGIC_VALUE_V0)
+            return KEY_OFFSET_V0 + Math.max(0, keySize());
+        else
+            return KEY_OFFSET_V1 + Math.max(0, keySize());
+    }
+
+    /**
+     * The length of the value in bytes
+     * @return the size in bytes of the value (0 if the value is null)
+     */
+    public int valueSize() {
+        return buffer.getInt(valueSizeOffset());
+    }
+
+    /**
+     * Check whether the value field of this record is null.
+     * @return true if the value is null, false otherwise
+     */
+    public boolean hasNullValue() {
+        return valueSize() < 0;
+    }
+
+    /**
+     * The magic value (i.e. message format version) of this record
+     * @return the magic value
+     */
+    public byte magic() {
+        return buffer.get(MAGIC_OFFSET);
+    }
+
+    /**
+     * The attributes stored with this record
+     * @return the attributes
+     */
+    public byte attributes() {
+        return buffer.get(ATTRIBUTES_OFFSET);
+    }
+
+    /**
+     * When magic value is greater than 0, the timestamp of a record is determined in the following way:
+     * 1. wrapperRecordTimestampType = null and wrapperRecordTimestamp is null - Uncompressed message, timestamp is in the message.
+     * 2. wrapperRecordTimestampType = LOG_APPEND_TIME and WrapperRecordTimestamp is not null - Compressed message using LOG_APPEND_TIME
+     * 3. wrapperRecordTimestampType = CREATE_TIME and wrapperRecordTimestamp is not null - Compressed message using CREATE_TIME
+     *
+     * @return the timestamp as determined above
+     */
+    public long timestamp() {
+        if (magic() == RecordBatch.MAGIC_VALUE_V0)
+            return RecordBatch.NO_TIMESTAMP;
+        else {
+            // case 2
+            if (wrapperRecordTimestampType == TimestampType.LOG_APPEND_TIME && wrapperRecordTimestamp != null)
+                return wrapperRecordTimestamp;
+            // Case 1, 3
+            else
+                return buffer.getLong(TIMESTAMP_OFFSET);
+        }
+    }
+
+    /**
+     * Get the timestamp type of the record.
+     *
+     * @return The timestamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic is 0.
+     */
+    public TimestampType timestampType() {
+        return timestampType(magic(), wrapperRecordTimestampType, attributes());
+    }
+
+    /**
+     * The compression type used with this record
+     */
+    public CompressionType compressionType() {
+        return CompressionType.forId(buffer.get(ATTRIBUTES_OFFSET) & COMPRESSION_CODEC_MASK);
+    }
+
+    /**
+     * A ByteBuffer containing the value of this record
+     * @return the value or null if the value for this record is null
+     */
+    public ByteBuffer value() {
+        return Utils.sizeDelimited(buffer, valueSizeOffset());
+    }
+
+    /**
+     * A ByteBuffer containing the message key
+     * @return the buffer or null if the key for this record is null
+     */
+    public ByteBuffer key() {
+        if (magic() == RecordBatch.MAGIC_VALUE_V0)
+            return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V0);
+        else
+            return Utils.sizeDelimited(buffer, KEY_SIZE_OFFSET_V1);
+    }
+
+    /**
+     * Get the underlying buffer backing this record instance.
+     *
+     * @return the buffer
+     */
+    public ByteBuffer buffer() {
+        return this.buffer;
+    }
+
+    public String toString() {
+        if (magic() > 0)
+            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, %s = %d, key = %d bytes, value = %d bytes)",
+                                 magic(),
+                                 attributes(),
+                                 compressionType(),
+                                 checksum(),
+                                 timestampType(),
+                                 timestamp(),
+                                 key() == null ? 0 : key().limit(),
+                                 value() == null ? 0 : value().limit());
+        else
+            return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
+                                 magic(),
+                                 attributes(),
+                                 compressionType(),
+                                 checksum(),
+                                 key() == null ? 0 : key().limit(),
+                                 value() == null ? 0 : value().limit());
+    }
+
+    public boolean equals(Object other) {
+        if (this == other)
+            return true;
+        if (other == null)
+            return false;
+        if (!other.getClass().equals(LegacyRecord.class))
+            return false;
+        LegacyRecord record = (LegacyRecord) other;
+        return this.buffer.equals(record.buffer);
+    }
+
+    public int hashCode() {
+        return buffer.hashCode();
+    }
+
+    /**
+     * Create a new record instance. If the record's compression type is not none, then
+     * its value payload should be already compressed with the specified type; the constructor
+     * would always write the value payload as is and will not do the compression itself.
+     *
+     * @param magic The magic value to use
+     * @param timestamp The timestamp of the record
+     * @param key The key of the record (null, if none)
+     * @param value The record value
+     * @param compressionType The compression type used on the contents of the record (if any)
+     * @param timestampType The timestamp type to be used for this record
+     */
+    public static LegacyRecord create(byte magic,
+                                      long timestamp,
+                                      byte[] key,
+                                      byte[] value,
+                                      CompressionType compressionType,
+                                      TimestampType timestampType) {
+        int keySize = key == null ? 0 : key.length;
+        int valueSize = value == null ? 0 : value.length;
+        ByteBuffer buffer = ByteBuffer.allocate(recordSize(magic, keySize, valueSize));
+        write(buffer, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
+        buffer.rewind();
+        return new LegacyRecord(buffer);
+    }
+
+    public static LegacyRecord create(byte magic, long timestamp, byte[] key, byte[] value) {
+        return create(magic, timestamp, key, value, CompressionType.NONE, TimestampType.CREATE_TIME);
+    }
+
+    /**
+     * Write the header for a compressed record set in-place (i.e. assuming the compressed record data has already
+     * been written at the value offset in a wrapped record). This lets you dynamically create a compressed message
+     * set, and then go back later and fill in its size and CRC, which saves the need for copying to another buffer.
+     *
+     * @param buffer The buffer containing the compressed record data positioned at the first offset of the
+     * @param magic The magic value of the record set
+     * @param recordSize The size of the record (including record overhead)
+     * @param timestamp The timestamp of the wrapper record
+     * @param compressionType The compression type used
+     * @param timestampType The timestamp type of the wrapper record
+     */
+    public static void writeCompressedRecordHeader(ByteBuffer buffer,
+                                                   byte magic,
+                                                   int recordSize,
+                                                   long timestamp,
+                                                   CompressionType compressionType,
+                                                   TimestampType timestampType) {
+        int recordPosition = buffer.position();
+        int valueSize = recordSize - recordOverhead(magic);
+
+        // write the record header with a null value (the key is always null for the wrapper)
+        write(buffer, magic, timestamp, null, null, compressionType, timestampType);
+        buffer.position(recordPosition);
+
+        // now fill in the value size
+        buffer.putInt(recordPosition + keyOffset(magic), valueSize);
+
+        // compute and fill the crc from the beginning of the message
+        long crc = Crc32.crc32(buffer, MAGIC_OFFSET, recordSize - MAGIC_OFFSET);
+        ByteUtils.writeUnsignedInt(buffer, recordPosition + CRC_OFFSET, crc);
+    }
+
+    private static void write(ByteBuffer buffer,
+                              byte magic,
+                              long timestamp,
+                              ByteBuffer key,
+                              ByteBuffer value,
+                              CompressionType compressionType,
+                              TimestampType timestampType) {
+        try {
+            DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
+            write(out, magic, timestamp, key, value, compressionType, timestampType);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Write the record data with the given compression type and return the computed crc.
+     *
+     * @param out The output stream to write to
+     * @param magic The magic value to be used
+     * @param timestamp The timestamp of the record
+     * @param key The record key
+     * @param value The record value
+     * @param compressionType The compression type
+     * @param timestampType The timestamp type
+     * @return the computed CRC for this record.
+     * @throws IOException for any IO errors writing to the output stream.
+     */
+    public static long write(DataOutputStream out,
+                             byte magic,
+                             long timestamp,
+                             byte[] key,
+                             byte[] value,
+                             CompressionType compressionType,
+                             TimestampType timestampType) throws IOException {
+        return write(out, magic, timestamp, wrapNullable(key), wrapNullable(value), compressionType, timestampType);
+    }
+
+    public static long write(DataOutputStream out,
+                             byte magic,
+                             long timestamp,
+                             ByteBuffer key,
+                             ByteBuffer value,
+                             CompressionType compressionType,
+                             TimestampType timestampType) throws IOException {
+        byte attributes = computeAttributes(magic, compressionType, timestampType);
+        long crc = computeChecksum(magic, attributes, timestamp, key, value);
+        write(out, magic, crc, attributes, timestamp, key, value);
+        return crc;
+    }
+
+    /**
+     * Write a record using raw fields (without validation). This should only be used in testing.
+     */
+    public static void write(DataOutputStream out,
+                             byte magic,
+                             long crc,
+                             byte attributes,
+                             long timestamp,
+                             byte[] key,
+                             byte[] value) throws IOException {
+        write(out, magic, crc, attributes, timestamp, wrapNullable(key), wrapNullable(value));
+    }
+
+    // Write a record to the buffer, if the record's compression type is none, then
+    // its value payload should be already compressed with the specified type
+    private static void write(DataOutputStream out,
+                              byte magic,
+                              long crc,
+                              byte attributes,
+                              long timestamp,
+                              ByteBuffer key,
+                              ByteBuffer value) throws IOException {
+        if (magic != RecordBatch.MAGIC_VALUE_V0 && magic != RecordBatch.MAGIC_VALUE_V1)
+            throw new IllegalArgumentException("Invalid magic value " + magic);
+        if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
+            throw new IllegalArgumentException("Invalid message timestamp " + timestamp);
+
+        // write crc
+        out.writeInt((int) (crc & 0xffffffffL));
+        // write magic value
+        out.writeByte(magic);
+        // write attributes
+        out.writeByte(attributes);
+
+        // maybe write timestamp
+        if (magic > RecordBatch.MAGIC_VALUE_V0)
+            out.writeLong(timestamp);
+
+        // write the key
+        if (key == null) {
+            out.writeInt(-1);
+        } else {
+            int size = key.remaining();
+            out.writeInt(size);
+            Utils.writeTo(out, key, size);
+        }
+        // write the value
+        if (value == null) {
+            out.writeInt(-1);
+        } else {
+            int size = value.remaining();
+            out.writeInt(size);
+            Utils.writeTo(out, value, size);
+        }
+    }
+
+    public static int recordSize(byte[] key, byte[] value) {
+        return recordSize(RecordBatch.CURRENT_MAGIC_VALUE, key, value);
+    }
+
+    public static int recordSize(byte magic, byte[] key, byte[] value) {
+        return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length);
+    }
+
+    public static int recordSize(byte magic, ByteBuffer key, ByteBuffer value) {
+        return recordSize(magic, key == null ? 0 : key.limit(), value == null ? 0 : value.limit());
+    }
+
+    private static int recordSize(byte magic, int keySize, int valueSize) {
+        return recordOverhead(magic) + keySize + valueSize;
+    }
+
+    // visible only for testing
+    public static byte computeAttributes(byte magic, CompressionType type, TimestampType timestampType) {
+        byte attributes = 0;
+        if (type.id > 0)
+            attributes |= COMPRESSION_CODEC_MASK & type.id;
+        if (magic > RecordBatch.MAGIC_VALUE_V0) {
+            if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
+                throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for " +
+                        "message format v1");
+            if (timestampType == TimestampType.LOG_APPEND_TIME)
+                attributes |= TIMESTAMP_TYPE_MASK;
+        }
+        return attributes;
+    }
+
+    // visible only for testing
+    public static long computeChecksum(byte magic, byte attributes, long timestamp, byte[] key, byte[] value) {
+        return computeChecksum(magic, attributes, timestamp, wrapNullable(key), wrapNullable(value));
+    }
+
+    /**
+     * Compute the checksum of the record from the attributes, key and value payloads
+     */
+    private static long computeChecksum(byte magic, byte attributes, long timestamp, ByteBuffer key, ByteBuffer value) {
+        Crc32 crc = new Crc32();
+        crc.update(magic);
+        crc.update(attributes);
+        if (magic > RecordBatch.MAGIC_VALUE_V0)
+            crc.updateLong(timestamp);
+        // update for the key
+        if (key == null) {
+            crc.updateInt(-1);
+        } else {
+            int size = key.remaining();
+            crc.updateInt(size);
+            crc.update(key, size);
+        }
+        // update for the value
+        if (value == null) {
+            crc.updateInt(-1);
+        } else {
+            int size = value.remaining();
+            crc.updateInt(size);
+            crc.update(value, size);
+        }
+        return crc.getValue();
+    }
+
+    public static int recordOverhead(byte magic) {
+        if (magic == 0)
+            return RECORD_OVERHEAD_V0;
+        return RECORD_OVERHEAD_V1;
+    }
+
+    private static int keyOffset(byte magic) {
+        if (magic == 0)
+            return KEY_OFFSET_V0;
+        return KEY_OFFSET_V1;
+    }
+
+    public static TimestampType timestampType(byte magic, TimestampType wrapperRecordTimestampType, byte attributes) {
+        if (magic == 0)
+            return TimestampType.NO_TIMESTAMP_TYPE;
+        else if (wrapperRecordTimestampType != null)
+            return wrapperRecordTimestampType;
+        else
+            return (attributes & TIMESTAMP_TYPE_MASK) == 0 ? TimestampType.CREATE_TIME : TimestampType.LOG_APPEND_TIME;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
deleted file mode 100644
index d4ffec0..0000000
--- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Iterator;
-
-import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
-
-/**
- * An offset and record pair
- */
-public abstract class LogEntry implements Iterable<LogEntry> {
-
-    /**
-     * Get the offset of this entry. Note that if this entry contains a compressed
-     * message set, then this offset will be the last offset of the nested entries
-     * @return the last offset contained in this entry
-     */
-    public abstract long offset();
-
-    /**
-     * Get the shallow record for this log entry.
-     * @return the shallow record
-     */
-    public abstract Record record();
-
-    /**
-     * Get the first offset of the records contained in this entry. Note that this
-     * generally requires deep iteration, which requires decompression, so this should
-     * be used with caution.
-     * @return The first offset contained in this entry
-     */
-    public long firstOffset() {
-        return iterator().next().offset();
-    }
-
-    /**
-     * Get the offset following this entry (i.e. the last offset contained in this entry plus one).
-     * @return the next consecutive offset following this entry
-     */
-    public long nextOffset() {
-        return offset() + 1;
-    }
-
-    /**
-     * Get the message format version of this entry (i.e its magic value).
-     * @return the magic byte
-     */
-    public byte magic() {
-        return record().magic();
-    }
-
-    @Override
-    public String toString() {
-        return "LogEntry(" + offset() + ", " + record() + ")";
-    }
-
-    /**
-     * Get the size in bytes of this entry, including the size of the record and the log overhead.
-     * @return The size in bytes of this entry
-     */
-    public int sizeInBytes() {
-        return record().sizeInBytes() + LOG_OVERHEAD;
-    }
-
-    /**
-     * Check whether this entry contains a compressed message set.
-     * @return true if so, false otherwise
-     */
-    public boolean isCompressed() {
-        return record().compressionType() != CompressionType.NONE;
-    }
-
-    /**
-     * Write this entry into a buffer.
-     * @param buffer The buffer to write the entry to
-     */
-    public void writeTo(ByteBuffer buffer) {
-        writeHeader(buffer, offset(), record().sizeInBytes());
-        buffer.put(record().buffer().duplicate());
-    }
-
-    /**
-     * Get an iterator for the nested entries contained within this log entry. Note that
-     * if the entry is not compressed, then this method will return an iterator over the
-     * shallow entry only (i.e. this object).
-     * @return An iterator over the entries contained within this log entry
-     */
-    @Override
-    public Iterator<LogEntry> iterator() {
-        if (isCompressed())
-            return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE);
-        return Collections.singletonList(this).iterator();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || !(o instanceof LogEntry)) return false;
-
-        LogEntry that = (LogEntry) o;
-
-        if (offset() != that.offset()) return false;
-        Record thisRecord = record();
-        Record thatRecord = that.record();
-        return thisRecord != null ? thisRecord.equals(thatRecord) : thatRecord == null;
-    }
-
-    @Override
-    public int hashCode() {
-        long offset = offset();
-        Record record = record();
-        int result = (int) (offset ^ (offset >>> 32));
-        result = 31 * result + (record != null ? record.hashCode() : 0);
-        return result;
-    }
-
-    public static void writeHeader(ByteBuffer buffer, long offset, int size) {
-        buffer.putLong(offset);
-        buffer.putInt(size);
-    }
-
-    public static void writeHeader(DataOutputStream out, long offset, int size) throws IOException {
-        out.writeLong(offset);
-        out.writeInt(size);
-    }
-
-    private static class SimpleLogEntry extends LogEntry {
-        private final long offset;
-        private final Record record;
-
-        public SimpleLogEntry(long offset, Record record) {
-            this.offset = offset;
-            this.record = record;
-        }
-
-        @Override
-        public long offset() {
-            return offset;
-        }
-
-        @Override
-        public Record record() {
-            return record;
-        }
-
-    }
-
-    public static LogEntry create(long offset, Record record) {
-        return new SimpleLogEntry(offset, record);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
index d81488d..0c2bb8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
@@ -19,21 +19,25 @@ package org.apache.kafka.common.record;
 import java.io.IOException;
 
 /**
- * An abstraction between an underlying input stream and record iterators, a LogInputStream
- * returns only the shallow log entries, depending on {@link RecordsIterator.DeepRecordsIterator}
- * for the deep iteration. The generic typing allows for implementations which present only
- * a view of the log entries, which enables more efficient iteration when the record data is
- * not actually needed. See for example {@link org.apache.kafka.common.record.FileLogInputStream.FileChannelLogEntry}
- * in which the record is not brought into memory until needed.
+ * An abstraction between an underlying input stream and record iterators, a {@link LogInputStream} only returns
+ * the batches at one level. For magic values 0 and 1, this means that it can either handle  iteration
+ * at the top level of the log or deep iteration within the payload of a single message, but it does not attempt
+ * to handle both. For magic value 2, this is only used for iterating over the top-level record batches (inner
+ * records do not follow the {@link RecordBatch} interface.
+ *
+ * The generic typing allows for implementations which present only a view of the log entries, which enables more
+ * efficient iteration when the record data is not actually needed. See for example
+ * {@link FileLogInputStream.FileChannelRecordBatch} in which the record is not brought into memory until needed.
+ *
  * @param <T> Type parameter of the log entry
  */
-interface LogInputStream<T extends LogEntry> {
+interface LogInputStream<T extends RecordBatch> {
 
     /**
-     * Get the next log entry from the underlying input stream.
+     * Get the next record batch from the underlying input stream.
      *
-     * @return The next log entry or null if there is none
+     * @return The next record batch or null if there is none
      * @throws IOException for any IO errors
      */
-    T nextEntry() throws IOException;
+    T nextBatch() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c5076d6..e59d9fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import org.apache.kafka.common.record.ByteBufferLogInputStream.ByteBufferLogEntry;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
@@ -25,11 +23,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * A {@link Records} implementation backed by a ByteBuffer. This is used only for reading or
- * modifying in-place an existing buffer of log entries. To create a new buffer see {@link MemoryRecordsBuilder},
- * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType) builder} variants.
+ * modifying in-place an existing buffer of record batches. To create a new buffer see {@link MemoryRecordsBuilder},
+ * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType, long)} variants.
  */
 public class MemoryRecords extends AbstractRecords {
 
@@ -37,19 +36,18 @@ public class MemoryRecords extends AbstractRecords {
 
     private final ByteBuffer buffer;
 
-    private final Iterable<ByteBufferLogEntry> shallowEntries = new Iterable<ByteBufferLogEntry>() {
+    private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
         @Override
-        public Iterator<ByteBufferLogEntry> iterator() {
-            return shallowIterator();
+        public Iterator<MutableRecordBatch> iterator() {
+            return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
         }
     };
 
-    private final Iterable<LogEntry> deepEntries = deepEntries(false);
-
     private int validBytes = -1;
 
     // Construct a writable memory records
     private MemoryRecords(ByteBuffer buffer) {
+        Objects.requireNonNull(buffer, "buffer should not be null");
         this.buffer = buffer;
     }
 
@@ -98,26 +96,31 @@ public class MemoryRecords extends AbstractRecords {
             return validBytes;
 
         int bytes = 0;
-        for (LogEntry entry : shallowEntries())
-            bytes += entry.sizeInBytes();
+        for (RecordBatch batch : batches())
+            bytes += batch.sizeInBytes();
 
         this.validBytes = bytes;
         return bytes;
     }
 
+    @Override
+    public MemoryRecords downConvert(byte toMagic) {
+        return downConvert(batches(), toMagic);
+    }
+
     /**
      * Filter the records into the provided ByteBuffer.
      * @param filter The filter function
      * @param destinationBuffer The byte buffer to write the filtered records to
      * @return A FilterResult with a summary of the output (for metrics)
      */
-    public FilterResult filterTo(LogEntryFilter filter, ByteBuffer destinationBuffer) {
-        return filterTo(shallowEntries(), filter, destinationBuffer);
+    public FilterResult filterTo(RecordFilter filter, ByteBuffer destinationBuffer) {
+        return filterTo(batches(), filter, destinationBuffer);
     }
 
-    private static FilterResult filterTo(Iterable<ByteBufferLogEntry> fromShallowEntries, LogEntryFilter filter,
-                                       ByteBuffer destinationBuffer) {
-        long maxTimestamp = Record.NO_TIMESTAMP;
+    private static FilterResult filterTo(Iterable<MutableRecordBatch> batches, RecordFilter filter,
+                                         ByteBuffer destinationBuffer) {
+        long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
         int messagesRead = 0;
@@ -125,33 +128,35 @@ public class MemoryRecords extends AbstractRecords {
         int messagesRetained = 0;
         int bytesRetained = 0;
 
-        for (ByteBufferLogEntry shallowEntry : fromShallowEntries) {
-            bytesRead += shallowEntry.sizeInBytes();
+        for (MutableRecordBatch batch : batches) {
+            bytesRead += batch.sizeInBytes();
 
-            // We use the absolute offset to decide whether to retain the message or not (this is handled by the
-            // deep iterator). Because of KAFKA-4298, we have to allow for the possibility that a previous version
-            // corrupted the log by writing a compressed message set with a wrapper magic value not matching the magic
-            // of the inner messages. This will be fixed as we recopy the messages to the destination buffer.
+            // We use the absolute offset to decide whether to retain the message or not. Due to KAFKA-4298, we have to
+            // allow for the possibility that a previous version corrupted the log by writing a compressed record batch
+            // with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
+            // recopy the messages to the destination buffer.
 
-            Record shallowRecord = shallowEntry.record();
-            byte shallowMagic = shallowRecord.magic();
+            byte batchMagic = batch.magic();
             boolean writeOriginalEntry = true;
-            List<LogEntry> retainedEntries = new ArrayList<>();
+            long firstOffset = -1;
+            List<Record> retainedRecords = new ArrayList<>();
+
+            for (Record record : batch) {
+                if (firstOffset < 0)
+                    firstOffset = record.offset();
 
-            for (LogEntry deepEntry : shallowEntry) {
-                Record deepRecord = deepEntry.record();
                 messagesRead += 1;
 
-                if (filter.shouldRetain(deepEntry)) {
+                if (filter.shouldRetain(record)) {
                     // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
-                    // the corrupted entry with correct data.
-                    if (shallowMagic != deepRecord.magic())
+                    // the corrupted batch with correct data.
+                    if (!record.hasMagic(batchMagic))
                         writeOriginalEntry = false;
 
-                    if (deepEntry.offset() > maxOffset)
-                        maxOffset = deepEntry.offset();
+                    if (record.offset() > maxOffset)
+                        maxOffset = record.offset();
 
-                    retainedEntries.add(deepEntry);
+                    retainedRecords.add(record);
                 } else {
                     writeOriginalEntry = false;
                 }
@@ -159,21 +164,26 @@ public class MemoryRecords extends AbstractRecords {
 
             if (writeOriginalEntry) {
                 // There are no messages compacted out and no message format conversion, write the original message set back
-                shallowEntry.writeTo(destinationBuffer);
-                messagesRetained += retainedEntries.size();
-                bytesRetained += shallowEntry.sizeInBytes();
-
-                if (shallowRecord.timestamp() > maxTimestamp) {
-                    maxTimestamp = shallowRecord.timestamp();
-                    shallowOffsetOfMaxTimestamp = shallowEntry.offset();
+                batch.writeTo(destinationBuffer);
+                messagesRetained += retainedRecords.size();
+                bytesRetained += batch.sizeInBytes();
+                if (batch.maxTimestamp() > maxTimestamp) {
+                    maxTimestamp = batch.maxTimestamp();
+                    shallowOffsetOfMaxTimestamp = batch.lastOffset();
                 }
-            } else if (!retainedEntries.isEmpty()) {
+            } else if (!retainedRecords.isEmpty()) {
                 ByteBuffer slice = destinationBuffer.slice();
-                MemoryRecordsBuilder builder = builderWithEntries(slice, shallowRecord.timestampType(), shallowRecord.compressionType(),
-                        shallowRecord.timestamp(), retainedEntries);
+                TimestampType timestampType = batch.timestampType();
+                long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
+                MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType,
+                        firstOffset, logAppendTime);
+
+                for (Record record : retainedRecords)
+                    builder.append(record);
+
                 MemoryRecords records = builder.build();
                 destinationBuffer.position(destinationBuffer.position() + slice.position());
-                messagesRetained += retainedEntries.size();
+                messagesRetained += retainedRecords.size();
                 bytesRetained += records.sizeInBytes();
 
                 MemoryRecordsBuilder.RecordsInfo info = builder.info();
@@ -195,46 +205,20 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
-    public Iterable<ByteBufferLogEntry> shallowEntries() {
-        return shallowEntries;
-    }
-
-    private Iterator<ByteBufferLogEntry> shallowIterator() {
-        return RecordsIterator.shallowIterator(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
-    }
-
-    @Override
-    public Iterable<LogEntry> deepEntries() {
-        return deepEntries;
-    }
-
-    public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic) {
-        return new Iterable<LogEntry>() {
-            @Override
-            public Iterator<LogEntry> iterator() {
-                return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
-            }
-        };
-    }
-
-    private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
-        return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
-                ensureMatchingMagic, maxMessageSize);
+    public Iterable<MutableRecordBatch> batches() {
+        return batches;
     }
 
     @Override
     public String toString() {
-        Iterator<LogEntry> iter = deepEntries().iterator();
+        Iterator<Record> iter = records().iterator();
         StringBuilder builder = new StringBuilder();
         builder.append('[');
         while (iter.hasNext()) {
-            LogEntry entry = iter.next();
+            Record record = iter.next();
             builder.append('(');
-            builder.append("offset=");
-            builder.append(entry.offset());
-            builder.append(",");
             builder.append("record=");
-            builder.append(entry.record());
+            builder.append(record);
             builder.append(")");
             if (iter.hasNext())
                 builder.append(", ");
@@ -245,8 +229,10 @@ public class MemoryRecords extends AbstractRecords {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
 
         MemoryRecords that = (MemoryRecords) o;
 
@@ -258,8 +244,8 @@ public class MemoryRecords extends AbstractRecords {
         return buffer.hashCode();
     }
 
-    public interface LogEntryFilter {
-        boolean shouldRetain(LogEntry entry);
+    public interface RecordFilter {
+        boolean shouldRetain(Record record);
     }
 
     public static class FilterResult {
@@ -288,133 +274,80 @@ public class MemoryRecords extends AbstractRecords {
         }
     }
 
-    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
-                                               CompressionType compressionType,
-                                               TimestampType timestampType,
-                                               int writeLimit) {
-        return new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L, System.currentTimeMillis(), writeLimit);
+    public static MemoryRecords readableRecords(ByteBuffer buffer) {
+        return new MemoryRecords(buffer);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
-                                               byte magic,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
-                                               long baseOffset,
-                                               long logAppendTime) {
-        return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, buffer.capacity());
+                                               int writeLimit) {
+        return new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L,
+                System.currentTimeMillis(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
+                RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                                CompressionType compressionType,
-                                               TimestampType timestampType) {
-        // use the buffer capacity as the default write limit
-        return builder(buffer, compressionType, timestampType, buffer.capacity());
+                                               TimestampType timestampType,
+                                               long baseOffset) {
+        return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                                byte magic,
                                                CompressionType compressionType,
-                                               TimestampType timestampType) {
-        return builder(buffer, magic, compressionType, timestampType, 0L);
+                                               TimestampType timestampType,
+                                               long baseOffset) {
+        return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis());
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               byte magic,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
-                                               long baseOffset) {
-        return builder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset, System.currentTimeMillis());
+                                               long baseOffset,
+                                               long logAppendTime) {
+        return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                                byte magic,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
-                                               long baseOffset) {
-        return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis());
-    }
-
-    public static MemoryRecords readableRecords(ByteBuffer buffer) {
-        return new MemoryRecords(buffer);
-    }
-
-    public static MemoryRecords withLogEntries(CompressionType compressionType, List<LogEntry> entries) {
-        return withLogEntries(TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), entries);
-    }
-
-    public static MemoryRecords withLogEntries(LogEntry ... entries) {
-        return withLogEntries(CompressionType.NONE, Arrays.asList(entries));
-    }
-
-    public static MemoryRecords withRecords(CompressionType compressionType, long initialOffset, List<Record> records) {
-        return withRecords(initialOffset, TimestampType.CREATE_TIME, compressionType, System.currentTimeMillis(), records);
-    }
-
-    public static MemoryRecords withRecords(Record ... records) {
-        return withRecords(CompressionType.NONE, 0L, Arrays.asList(records));
-    }
-
-    public static MemoryRecords withRecords(long initialOffset, Record ... records) {
-        return withRecords(CompressionType.NONE, initialOffset, Arrays.asList(records));
+                                               long baseOffset,
+                                               long logAppendTime,
+                                               long pid,
+                                               short epoch,
+                                               int baseSequence) {
+        return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
+                logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
+                buffer.capacity());
     }
 
-    public static MemoryRecords withRecords(CompressionType compressionType, Record ... records) {
-        return withRecords(compressionType, 0L, Arrays.asList(records));
+    public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, compressionType, records);
     }
 
-    public static MemoryRecords withRecords(TimestampType timestampType, CompressionType compressionType, Record ... records) {
-        return withRecords(0L, timestampType, compressionType, System.currentTimeMillis(), Arrays.asList(records));
+    public static MemoryRecords withRecords(byte magic, CompressionType compressionType, SimpleRecord... records) {
+        return withRecords(magic, 0L, compressionType, TimestampType.CREATE_TIME, records);
     }
 
-    public static MemoryRecords withRecords(long initialOffset,
-                                            TimestampType timestampType,
-                                            CompressionType compressionType,
-                                            long logAppendTime,
-                                            List<Record> records) {
-        return withLogEntries(timestampType, compressionType, logAppendTime, buildLogEntries(initialOffset, records));
+    public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
     }
 
-    private static MemoryRecords withLogEntries(TimestampType timestampType,
-                                                CompressionType compressionType,
-                                                long logAppendTime,
-                                                List<LogEntry> entries) {
-        if (entries.isEmpty())
+    public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
+                                            TimestampType timestampType, SimpleRecord... records) {
+        if (records.length == 0)
             return MemoryRecords.EMPTY;
-        return builderWithEntries(timestampType, compressionType, logAppendTime, entries).build();
-    }
-
-    private static List<LogEntry> buildLogEntries(long initialOffset, List<Record> records) {
-        List<LogEntry> entries = new ArrayList<>();
-        for (Record record : records)
-            entries.add(LogEntry.create(initialOffset++, record));
-        return entries;
-    }
-
-    public static MemoryRecordsBuilder builderWithEntries(TimestampType timestampType,
-                                                          CompressionType compressionType,
-                                                          long logAppendTime,
-                                                          List<LogEntry> entries) {
-        ByteBuffer buffer = ByteBuffer.allocate(estimatedSize(compressionType, entries));
-        return builderWithEntries(buffer, timestampType, compressionType, logAppendTime, entries);
-    }
-
-    private static MemoryRecordsBuilder builderWithEntries(ByteBuffer buffer,
-                                                           TimestampType timestampType,
-                                                           CompressionType compressionType,
-                                                           long logAppendTime,
-                                                           List<LogEntry> entries) {
-        if (entries.isEmpty())
-            throw new IllegalArgumentException("entries must not be empty");
-
-        LogEntry firstEntry = entries.iterator().next();
-        long firstOffset = firstEntry.offset();
-        byte magic = firstEntry.record().magic();
-
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType,
-                firstOffset, logAppendTime);
-        for (LogEntry entry : entries)
-            builder.appendWithOffset(entry.offset(), entry.record());
-
-        return builder;
+        int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
+        ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+        MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset);
+        for (SimpleRecord record : records)
+            builder.append(record);
+        return builder.build();
     }
 
 }


Mime
View raw message