kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [06/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:43:59 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8a80790..0dea6b6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
@@ -30,7 +32,11 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -39,6 +45,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -60,12 +67,14 @@ public class SenderTest {
     private static final int MAX_BLOCK_TIMEOUT = 1000;
     private static final int REQUEST_TIMEOUT = 1000;
 
-    private TopicPartition tp = new TopicPartition("test", 0);
+    private TopicPartition tp0 = new TopicPartition("test", 0);
+    private TopicPartition tp1 = new TopicPartition("test", 1);
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
     private int batchSize = 16 * 1024;
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners());
-    private Cluster cluster = TestUtils.singletonCluster("test", 1);
+    private ApiVersions apiVersions = new ApiVersions();
+    private Cluster cluster = TestUtils.singletonCluster("test", 2);
     private Metrics metrics = null;
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
@@ -76,7 +85,7 @@ public class SenderTest {
         metricTags.put("client-id", CLIENT_ID);
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         metrics = new Metrics(metricConfig, time);
-        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time);
+        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions);
         sender = new Sender(client,
                             metadata,
                             this.accumulator,
@@ -86,7 +95,8 @@ public class SenderTest {
                             MAX_RETRIES,
                             metrics,
                             time,
-                            REQUEST_TIMEOUT);
+                            REQUEST_TIMEOUT,
+                            apiVersions);
 
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
     }
@@ -99,12 +109,12 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
         assertTrue(client.hasInFlightRequests());
-        client.respond(produceResponse(tp, offset, Errors.NONE, 0));
+        client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("All requests completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
@@ -113,6 +123,101 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
     }
 
+    @Test
+    public void testMessageFormatDownConversion() throws Exception {
+        // this test case verifies the behavior when the version of the produce request supported by the
+        // broker changes after the record set is created
+
+        long offset = 0;
+
+        // start off support produce request v3
+        apiVersions.update("0", NodeApiVersions.create());
+
+        Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
+                null, MAX_BLOCK_TIMEOUT).future;
+
+        // now the partition leader supports only v2
+        apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
+                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                ProduceRequest request = (ProduceRequest) body;
+                if (request.version() != 2)
+                    return false;
+
+                MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
+                return records != null &&
+                        records.sizeInBytes() > 0 &&
+                        records.hasMatchingMagic(RecordBatch.MAGIC_VALUE_V1);
+            }
+        }, produceResponse(tp0, offset, Errors.NONE, 0));
+
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+
+        assertTrue("Request should be completed", future.isDone());
+        assertEquals(offset, future.get().offset());
+    }
+
+    @Test
+    public void testDownConversionForMismatchedMagicValues() throws Exception {
+        // it can happen that we construct a record set with mismatching magic values (perhaps
+        // because the partition leader changed after the record set was initially constructed)
+        // in this case, we down-convert record sets with newer magic values to match the oldest
+        // created record set
+
+        long offset = 0;
+
+        // start off support produce request v3
+        apiVersions.update("0", NodeApiVersions.create());
+
+        Future<RecordMetadata> future1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
+                null, MAX_BLOCK_TIMEOUT).future;
+
+        // now the partition leader supports only v2
+        apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
+                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+
+        Future<RecordMetadata> future2 = accumulator.append(tp1, 0L, "key".getBytes(), "value".getBytes(),
+                null, MAX_BLOCK_TIMEOUT).future;
+
+        // start off support produce request v3
+        apiVersions.update("0", NodeApiVersions.create());
+
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, RecordBatch.NO_TIMESTAMP);
+        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<>();
+        partResp.put(tp0, resp);
+        partResp.put(tp1, resp);
+        ProduceResponse produceResponse = new ProduceResponse(partResp, 0);
+
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                ProduceRequest request = (ProduceRequest) body;
+                if (request.version() != 2)
+                    return false;
+
+                Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail();
+                if (recordsMap.size() != 2)
+                    return false;
+
+                for (MemoryRecords records : recordsMap.values()) {
+                    if (records == null || records.sizeInBytes() == 0 || !records.hasMatchingMagic(RecordBatch.MAGIC_VALUE_V1))
+                        return false;
+                }
+                return true;
+            }
+        }, produceResponse);
+
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+
+        assertTrue("Request should be completed", future1.isDone());
+        assertTrue("Request should be completed", future2.isDone());
+    }
+
     /*
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */
@@ -120,9 +225,9 @@ public class SenderTest {
     public void testQuotaMetrics() throws Exception {
         final long offset = 0;
         for (int i = 1; i <= 3; i++) {
-            accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT);
+            accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT);
             sender.run(time.milliseconds()); // send produce request
-            client.respond(produceResponse(tp, offset, Errors.NONE, 100 * i));
+            client.respond(produceResponse(tp0, offset, Errors.NONE, 100 * i));
             sender.run(time.milliseconds());
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
@@ -147,9 +252,10 @@ public class SenderTest {
                                        maxRetries,
                                        m,
                                        time,
-                                       REQUEST_TIMEOUT);
+                                       REQUEST_TIMEOUT,
+                                       apiVersions);
             // do a successful retry
-            Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
             String id = client.requests().peek().destination();
@@ -167,13 +273,13 @@ public class SenderTest {
             assertEquals(1, client.inFlightRequestCount());
             assertTrue(client.hasInFlightRequests());
             long offset = 0;
-            client.respond(produceResponse(tp, offset, Errors.NONE, 0));
+            client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
             sender.run(time.milliseconds());
             assertTrue("Request should have retried and completed", future.isDone());
             assertEquals(offset, future.get().offset());
 
             // do an unsuccessful retry
-            future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+            future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // send produce request
             for (int i = 0; i < maxRetries + 1; i++) {
                 client.disconnect(client.requests().peek().destination());
@@ -202,7 +308,8 @@ public class SenderTest {
                 maxRetries,
                 m,
                 time,
-                REQUEST_TIMEOUT);
+                REQUEST_TIMEOUT,
+                apiVersions);
 
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
@@ -245,28 +352,28 @@ public class SenderTest {
         long offset = 0;
         metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
 
-        Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
-        assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+        assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         sender.run(time.milliseconds());  // send produce request
-        client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
+        client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
 
-        assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic()));
+        assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp0.topic()));
         time.sleep(Metadata.TOPIC_EXPIRY_MS);
         metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
-        assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic()));
-        future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
+        future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
-        assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+        assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         sender.run(time.milliseconds());  // send produce request
-        client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
+        client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
         assertEquals("Request completed.", 0, client.inFlightRequestCount());
         assertFalse(client.hasInFlightRequests());
@@ -285,7 +392,7 @@ public class SenderTest {
     }
 
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
-        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, Record.NO_TIMESTAMP);
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
new file mode 100644
index 0000000..0f01f2a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.record.AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AbstractLegacyRecordBatchTest {
+
+    @Test
+    public void testSetLastOffsetCompressed() {
+        SimpleRecord[] simpleRecords = new SimpleRecord[] {
+            new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+            new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+            new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+        };
+
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords);
+
+        long lastOffset = 500L;
+        long firstOffset = lastOffset - simpleRecords.length + 1;
+
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setLastOffset(lastOffset);
+        assertEquals(lastOffset, batch.lastOffset());
+        assertEquals(firstOffset, batch.baseOffset());
+        assertTrue(batch.isValid());
+
+        List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+        assertEquals(1, recordBatches.size());
+        assertEquals(lastOffset, recordBatches.get(0).lastOffset());
+
+        long offset = firstOffset;
+        for (Record record : records.records())
+            assertEquals(offset++, record.offset());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetNoTimestampTypeNotAllowed() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setMaxTimestamp(TimestampType.NO_TIMESTAMP_TYPE, RecordBatch.NO_TIMESTAMP);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSetLogAppendTimeNotAllowedV0() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        long logAppendTime = 15L;
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, logAppendTime);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSetCreateTimeNotAllowedV0() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        long createTime = 15L;
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setMaxTimestamp(TimestampType.CREATE_TIME, createTime);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSetPartitionLeaderEpochNotAllowedV0() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setPartitionLeaderEpoch(15);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testSetPartitionLeaderEpochNotAllowedV1() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setPartitionLeaderEpoch(15);
+    }
+
+    @Test
+    public void testSetLogAppendTimeV1() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+        long logAppendTime = 15L;
+
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, logAppendTime);
+        assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+        assertEquals(logAppendTime, batch.maxTimestamp());
+        assertTrue(batch.isValid());
+
+        List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+        assertEquals(1, recordBatches.size());
+        assertEquals(TimestampType.LOG_APPEND_TIME, recordBatches.get(0).timestampType());
+        assertEquals(logAppendTime, recordBatches.get(0).maxTimestamp());
+
+        for (Record record : records.records())
+            assertEquals(logAppendTime, record.timestamp());
+    }
+
+    @Test
+    public void testSetCreateTimeV1() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+        long createTime = 15L;
+
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setMaxTimestamp(TimestampType.CREATE_TIME, createTime);
+        assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+        assertEquals(createTime, batch.maxTimestamp());
+        assertTrue(batch.isValid());
+
+        List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+        assertEquals(1, recordBatches.size());
+        assertEquals(TimestampType.CREATE_TIME, recordBatches.get(0).timestampType());
+        assertEquals(createTime, recordBatches.get(0).maxTimestamp());
+
+        long expectedTimestamp = 1L;
+        for (Record record : records.records())
+            assertEquals(expectedTimestamp++, record.timestamp());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
index 6475009..3745006 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
@@ -16,95 +16,109 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class ByteBufferLogInputStreamTest {
 
     @Test
     public void iteratorIgnoresIncompleteEntries() {
-        ByteBuffer buffer = ByteBuffer.allocate(2048);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(15L, "a".getBytes(), "1".getBytes());
         builder.append(20L, "b".getBytes(), "2".getBytes());
+        builder.close();
 
-        ByteBuffer recordsBuffer = builder.build().buffer();
-        recordsBuffer.limit(recordsBuffer.limit() - 5);
+        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+        builder.append(30L, "c".getBytes(), "3".getBytes());
+        builder.append(40L, "d".getBytes(), "4".getBytes());
+        builder.close();
 
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowEntries().iterator();
+        buffer.flip();
+        buffer.limit(buffer.limit() - 5);
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        Iterator<MutableRecordBatch> iterator = records.batches().iterator();
         assertTrue(iterator.hasNext());
-        ByteBufferLogInputStream.ByteBufferLogEntry first = iterator.next();
-        assertEquals(0L, first.offset());
+        MutableRecordBatch first = iterator.next();
+        assertEquals(1L, first.lastOffset());
 
         assertFalse(iterator.hasNext());
     }
 
-    @Test
-    public void testSetCreateTimeV1() {
-        ByteBuffer buffer = ByteBuffer.allocate(2048);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+    @Test(expected = CorruptRecordException.class)
+    public void iteratorRaisesOnTooSmallRecords() throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
-
-        assertTrue(iterator.hasNext());
-        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
-
-        long createTimeMs = 20L;
-        entry.setCreateTime(createTimeMs);
+        builder.append(20L, "b".getBytes(), "2".getBytes());
+        builder.close();
 
-        assertEquals(TimestampType.CREATE_TIME, entry.record().timestampType());
-        assertEquals(createTimeMs, entry.record().timestamp());
-    }
+        int position = buffer.position();
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testSetCreateTimeNotAllowedV0() {
-        ByteBuffer buffer = ByteBuffer.allocate(2048);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
+        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+        builder.append(30L, "c".getBytes(), "3".getBytes());
+        builder.append(40L, "d".getBytes(), "4".getBytes());
+        builder.close();
 
-        assertTrue(iterator.hasNext());
-        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+        buffer.flip();
+        buffer.putInt(position + DefaultRecordBatch.LENGTH_OFFSET, 9);
 
-        long createTimeMs = 20L;
-        entry.setCreateTime(createTimeMs);
+        ByteBufferLogInputStream logInputStream = new ByteBufferLogInputStream(buffer, Integer.MAX_VALUE);
+        assertNotNull(logInputStream.nextBatch());
+        logInputStream.nextBatch();
     }
 
-    @Test
-    public void testSetLogAppendTimeV1() {
-        ByteBuffer buffer = ByteBuffer.allocate(2048);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+    @Test(expected = CorruptRecordException.class)
+    public void iteratorRaisesOnInvalidMagic() throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
+        builder.append(20L, "b".getBytes(), "2".getBytes());
+        builder.close();
 
-        assertTrue(iterator.hasNext());
-        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+        int position = buffer.position();
 
-        long logAppendTime = 20L;
-        entry.setLogAppendTime(logAppendTime);
+        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+        builder.append(30L, "c".getBytes(), "3".getBytes());
+        builder.append(40L, "d".getBytes(), "4".getBytes());
+        builder.close();
 
-        assertEquals(TimestampType.LOG_APPEND_TIME, entry.record().timestampType());
-        assertEquals(logAppendTime, entry.record().timestamp());
+        buffer.flip();
+        buffer.put(position + DefaultRecordBatch.MAGIC_OFFSET, (byte) 37);
+
+        ByteBufferLogInputStream logInputStream = new ByteBufferLogInputStream(buffer, Integer.MAX_VALUE);
+        assertNotNull(logInputStream.nextBatch());
+        logInputStream.nextBatch();
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testSetLogAppendTimeNotAllowedV0() {
-        ByteBuffer buffer = ByteBuffer.allocate(2048);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+    @Test(expected = CorruptRecordException.class)
+    public void iteratorRaisesOnTooLargeRecords() throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
         builder.append(15L, "a".getBytes(), "1".getBytes());
-        Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
+        builder.append(20L, "b".getBytes(), "2".getBytes());
+        builder.close();
 
-        assertTrue(iterator.hasNext());
-        ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+        builder.append(30L, "c".getBytes(), "3".getBytes());
+        builder.append(40L, "d".getBytes(), "4".getBytes());
+        builder.close();
+
+        buffer.flip();
 
-        long logAppendTime = 20L;
-        entry.setLogAppendTime(logAppendTime);
+        ByteBufferLogInputStream logInputStream = new ByteBufferLogInputStream(buffer, 25);
+        assertNotNull(logInputStream.nextBatch());
+        logInputStream.nextBatch();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index 21cbfb7..98dc591 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -31,13 +31,13 @@ public class CompressionTypeTest {
     public void testLZ4FramingMagicV0() {
         ByteBuffer buffer = ByteBuffer.allocate(256);
         KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput(
-                new ByteBufferOutputStream(buffer), Record.MAGIC_VALUE_V0, 256);
+                new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0, 256);
         assertTrue(out.useBrokenFlagDescriptorChecksum());
 
         buffer.rewind();
 
         KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
-                new ByteBufferInputStream(buffer), Record.MAGIC_VALUE_V0);
+                new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V0);
         assertTrue(in.ignoreFlagDescriptorChecksum());
     }
 
@@ -45,13 +45,13 @@ public class CompressionTypeTest {
     public void testLZ4FramingMagicV1() {
         ByteBuffer buffer = ByteBuffer.allocate(256);
         KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput(
-                new ByteBufferOutputStream(buffer), Record.MAGIC_VALUE_V1, 256);
+                new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1, 256);
         assertFalse(out.useBrokenFlagDescriptorChecksum());
 
         buffer.rewind();
 
         KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
-                new ByteBufferInputStream(buffer), Record.MAGIC_VALUE_V1);
+                new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V1);
         assertFalse(in.ignoreFlagDescriptorChecksum());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
new file mode 100644
index 0000000..7c0faa5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class ControlRecordTypeTest {
+
+    @Test
+    public void testParseUnknownType() {
+        ByteBuffer buffer = ByteBuffer.allocate(32);
+        buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
+        buffer.putShort((short) 337);
+        buffer.flip();
+        ControlRecordType type = ControlRecordType.parse(buffer);
+        assertEquals(ControlRecordType.UNKNOWN, type);
+    }
+
+    @Test
+    public void testParseUnknownVersion() {
+        ByteBuffer buffer = ByteBuffer.allocate(32);
+        buffer.putShort((short) 5);
+        buffer.putShort(ControlRecordType.ABORT.type);
+        buffer.putInt(23432); // some field added in version 5
+        buffer.flip();
+        ControlRecordType type = ControlRecordType.parse(buffer);
+        assertEquals(ControlRecordType.ABORT, type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
new file mode 100644
index 0000000..b02c5c9
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DefaultRecordBatchTest {
+
+    @Test
+    public void buildDefaultRecordBatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 1234567L);
+        builder.appendWithOffset(1234567, System.currentTimeMillis(), "a".getBytes(), "v".getBytes());
+        builder.appendWithOffset(1234568, System.currentTimeMillis(), "b".getBytes(), "v".getBytes());
+
+        MemoryRecords records = builder.build();
+        for (MutableRecordBatch batch : records.batches()) {
+            assertEquals(1234567, batch.baseOffset());
+            assertEquals(1234568, batch.lastOffset());
+            assertTrue(batch.isValid());
+
+            for (Record record : batch) {
+                assertTrue(record.isValid());
+            }
+        }
+    }
+
+    @Test
+    public void testSizeInBytes() {
+        Header[] headers = new Header[] {
+            new Header("foo", "value".getBytes()),
+            new Header("bar", Utils.wrapNullable(null))
+        };
+
+        long timestamp = System.currentTimeMillis();
+        SimpleRecord[] records = new SimpleRecord[] {
+            new SimpleRecord(timestamp, "key".getBytes(), "value".getBytes()),
+            new SimpleRecord(timestamp + 30000, null, "value".getBytes()),
+            new SimpleRecord(timestamp + 60000, "key".getBytes(), null),
+            new SimpleRecord(timestamp + 60000, "key".getBytes(), "value".getBytes(), headers)
+        };
+        int actualSize = MemoryRecords.withRecords(CompressionType.NONE, records).sizeInBytes();
+        assertEquals(actualSize, DefaultRecordBatch.sizeInBytes(Arrays.asList(records)));
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testInvalidRecordSize() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+        ByteBuffer buffer = records.buffer();
+        buffer.putInt(DefaultRecordBatch.LENGTH_OFFSET, 10);
+
+        DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
+        assertFalse(batch.isValid());
+        batch.ensureValid();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testInvalidCrc() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+        ByteBuffer buffer = records.buffer();
+        buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, 23);
+
+        DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
+        assertFalse(batch.isValid());
+        batch.ensureValid();
+    }
+
+    @Test
+    public void testSetLastOffset() {
+        SimpleRecord[] simpleRecords = new SimpleRecord[] {
+            new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+            new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+            new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+        };
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecords);
+
+        long lastOffset = 500L;
+        long firstOffset = lastOffset - simpleRecords.length + 1;
+
+        DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+        batch.setLastOffset(lastOffset);
+        assertEquals(lastOffset, batch.lastOffset());
+        assertEquals(firstOffset, batch.baseOffset());
+        assertTrue(batch.isValid());
+
+        List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+        assertEquals(1, recordBatches.size());
+        assertEquals(lastOffset, recordBatches.get(0).lastOffset());
+
+        long offset = firstOffset;
+        for (Record record : records.records())
+            assertEquals(offset++, record.offset());
+    }
+
+    @Test
+    public void testSetPartitionLeaderEpoch() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+        int leaderEpoch = 500;
+
+        DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+        batch.setPartitionLeaderEpoch(leaderEpoch);
+        assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
+        assertTrue(batch.isValid());
+
+        List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+        assertEquals(1, recordBatches.size());
+        assertEquals(leaderEpoch, recordBatches.get(0).partitionLeaderEpoch());
+    }
+
+    @Test
+    public void testSetLogAppendTime() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+        long logAppendTime = 15L;
+
+        DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+        batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, logAppendTime);
+        assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+        assertEquals(logAppendTime, batch.maxTimestamp());
+        assertTrue(batch.isValid());
+
+        List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+        assertEquals(1, recordBatches.size());
+        assertEquals(logAppendTime, recordBatches.get(0).maxTimestamp());
+        assertEquals(TimestampType.LOG_APPEND_TIME, recordBatches.get(0).timestampType());
+
+        for (Record record : records.records())
+            assertEquals(logAppendTime, record.timestamp());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetNoTimestampTypeNotAllowed() {
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+                CompressionType.NONE, TimestampType.CREATE_TIME,
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+        DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+        batch.setMaxTimestamp(TimestampType.NO_TIMESTAMP_TYPE, RecordBatch.NO_TIMESTAMP);
+    }
+
+    @Test
+    public void testReadAndWriteControlRecord() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+
+        builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.COMMIT, null);
+        builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.ABORT, null);
+        MemoryRecords records = builder.build();
+
+        List<Record> logRecords = TestUtils.toList(records.records());
+        assertEquals(2, logRecords.size());
+
+        Record commitRecord = logRecords.get(0);
+        assertTrue(commitRecord.isControlRecord());
+        assertEquals(ControlRecordType.COMMIT, ControlRecordType.parse(commitRecord.key()));
+
+        Record abortRecord = logRecords.get(1);
+        assertTrue(abortRecord.isControlRecord());
+        assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
new file mode 100644
index 0000000..8502475
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultRecordTest {
+
+    @Test
+    public void testBasicSerde() {
+        Header[] headers = new Header[] {
+            new Header("foo", "value".getBytes()),
+            new Header("bar", Utils.wrapNullable(null)),
+            new Header("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
+        };
+
+        SimpleRecord[] records = new SimpleRecord[] {
+            new SimpleRecord("hi".getBytes(), "there".getBytes()),
+            new SimpleRecord(null, "there".getBytes()),
+            new SimpleRecord("hi".getBytes(), null),
+            new SimpleRecord(null, null),
+            new SimpleRecord(15L, "hi".getBytes(), "there".getBytes(), headers)
+        };
+
+        for (boolean isControlRecord : Arrays.asList(true, false)) {
+            for (SimpleRecord record : records) {
+                int baseSequence = 723;
+                long baseOffset = 37;
+                int offsetDelta = 10;
+                long baseTimestamp = System.currentTimeMillis();
+                long timestampDelta = 323;
+
+                ByteBuffer buffer = ByteBuffer.allocate(1024);
+                DefaultRecord.writeTo(buffer, isControlRecord, offsetDelta, timestampDelta, record.key(),
+                        record.value(), record.headers());
+                buffer.flip();
+
+                DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
+                assertNotNull(logRecord);
+                assertEquals(baseOffset + offsetDelta, logRecord.offset());
+                assertEquals(baseSequence + offsetDelta, logRecord.sequence());
+                assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
+                assertEquals(record.key(), logRecord.key());
+                assertEquals(record.value(), logRecord.value());
+                assertEquals(isControlRecord, logRecord.isControlRecord());
+                assertArrayEquals(record.headers(), logRecord.headers());
+                assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
+                        record.headers()), logRecord.sizeInBytes());
+            }
+        }
+    }
+
+    @Test
+    public void testSerdeNoSequence() {
+        ByteBuffer key = ByteBuffer.wrap("hi".getBytes());
+        ByteBuffer value = ByteBuffer.wrap("there".getBytes());
+        long baseOffset = 37;
+        int offsetDelta = 10;
+        long baseTimestamp = System.currentTimeMillis();
+        long timestampDelta = 323;
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        DefaultRecord.writeTo(buffer, false, offsetDelta, timestampDelta, key, value, new Header[0]);
+        buffer.flip();
+
+        DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null);
+        assertNotNull(record);
+        assertEquals(RecordBatch.NO_SEQUENCE, record.sequence());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
new file mode 100644
index 0000000..65de01c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FileLogInputStreamTest {
+
+    @Test
+    public void testWriteTo() throws IOException {
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("foo".getBytes()),
+                    new SimpleRecord("bar".getBytes())));
+            fileRecords.flush();
+
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), Integer.MAX_VALUE, 0,
+                    fileRecords.sizeInBytes());
+
+            FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch();
+            assertNotNull(batch);
+            assertEquals(RecordBatch.MAGIC_VALUE_V2, batch.magic());
+
+            ByteBuffer buffer = ByteBuffer.allocate(128);
+            batch.writeTo(buffer);
+            buffer.flip();
+
+            MemoryRecords memRecords = MemoryRecords.readableRecords(buffer);
+            List<Record> records = Utils.toList(memRecords.records().iterator());
+            assertEquals(2, records.size());
+            Record record0 = records.get(0);
+            assertTrue(record0.hasMagic(RecordBatch.MAGIC_VALUE_V2));
+            assertEquals("foo", Utils.utf8(record0.value(), record0.valueSize()));
+            Record record1 = records.get(1);
+            assertTrue(record1.hasMagic(RecordBatch.MAGIC_VALUE_V2));
+            assertEquals("bar", Utils.utf8(record1.value(), record1.valueSize()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 7a4dc1e..7e780dd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.kafka.test.TestUtils.tempFile;
@@ -37,16 +38,16 @@ import static org.junit.Assert.fail;
 
 public class FileRecordsTest {
 
-    private Record[] records = new Record[] {
-            Record.create("abcd".getBytes()),
-            Record.create("efgh".getBytes()),
-            Record.create("ijkl".getBytes())
+    private byte[][] values = new byte[][] {
+            "abcd".getBytes(),
+            "efgh".getBytes(),
+            "ijkl".getBytes()
     };
     private FileRecords fileRecords;
 
     @Before
     public void setup() throws IOException {
-        this.fileRecords = createFileRecords(records);
+        this.fileRecords = createFileRecords(values);
     }
 
     /**
@@ -56,7 +57,7 @@ public class FileRecordsTest {
     public void testFileSize() throws IOException {
         assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
         for (int i = 0; i < 20; i++) {
-            fileRecords.append(MemoryRecords.withRecords(Record.create("abcd".getBytes())));
+            fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("abcd".getBytes())));
             assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
         }
     }
@@ -83,7 +84,11 @@ public class FileRecordsTest {
         fileRecords.channel().write(buffer);
 
         // appending those bytes should not change the contents
-        TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
+        Iterator<Record> records = fileRecords.records().iterator();
+        for (byte[] value : values) {
+            assertTrue(records.hasNext());
+            assertEquals(records.next().value(), ByteBuffer.wrap(value));
+        }
     }
 
     /**
@@ -92,7 +97,11 @@ public class FileRecordsTest {
     @Test
     public void testIterationDoesntChangePosition() throws IOException {
         long position = fileRecords.channel().position();
-        TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
+        Iterator<Record> records = fileRecords.records().iterator();
+        for (byte[] value : values) {
+            assertTrue(records.hasNext());
+            assertEquals(records.next().value(), ByteBuffer.wrap(value));
+        }
         assertEquals(position, fileRecords.channel().position());
     }
 
@@ -102,18 +111,18 @@ public class FileRecordsTest {
     @Test
     public void testRead() throws IOException {
         FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
-        TestUtils.checkEquals(fileRecords.shallowEntries(), read.shallowEntries());
+        TestUtils.checkEquals(fileRecords.batches(), read.batches());
 
-        List<LogEntry> items = shallowEntries(read);
-        LogEntry second = items.get(1);
+        List<RecordBatch> items = batches(read);
+        RecordBatch second = items.get(1);
 
         read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
         assertEquals("Try a read starting from the second message",
-                items.subList(1, 3), shallowEntries(read));
+                items.subList(1, 3), batches(read));
 
         read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
         assertEquals("Try a read of a single message starting from the second message",
-                Collections.singletonList(second), shallowEntries(read));
+                Collections.singletonList(second), batches(read));
     }
 
     /**
@@ -122,28 +131,28 @@ public class FileRecordsTest {
     @Test
     public void testSearch() throws IOException {
         // append a new message with a high offset
-        Record lastMessage = Record.create("test".getBytes());
-        fileRecords.append(MemoryRecords.withRecords(50L, lastMessage));
+        SimpleRecord lastMessage = new SimpleRecord("test".getBytes());
+        fileRecords.append(MemoryRecords.withRecords(50L, CompressionType.NONE, lastMessage));
 
-        List<LogEntry> entries = shallowEntries(fileRecords);
+        List<RecordBatch> batches = batches(fileRecords);
         int position = 0;
 
-        int message1Size = entries.get(0).sizeInBytes();
+        int message1Size = batches.get(0).sizeInBytes();
         assertEquals("Should be able to find the first message by its offset",
                 new FileRecords.LogEntryPosition(0L, position, message1Size),
                 fileRecords.searchForOffsetWithSize(0, 0));
         position += message1Size;
 
-        int message2Size = entries.get(1).sizeInBytes();
+        int message2Size = batches.get(1).sizeInBytes();
         assertEquals("Should be able to find second message when starting from 0",
                 new FileRecords.LogEntryPosition(1L, position, message2Size),
                 fileRecords.searchForOffsetWithSize(1, 0));
         assertEquals("Should be able to find second message starting from its offset",
                 new FileRecords.LogEntryPosition(1L, position, message2Size),
                 fileRecords.searchForOffsetWithSize(1, position));
-        position += message2Size + entries.get(2).sizeInBytes();
+        position += message2Size + batches.get(2).sizeInBytes();
 
-        int message4Size = entries.get(3).sizeInBytes();
+        int message4Size = batches.get(3).sizeInBytes();
         assertEquals("Should be able to find fourth message from a non-existant offset",
                 new FileRecords.LogEntryPosition(50L, position, message4Size),
                 fileRecords.searchForOffsetWithSize(3, position));
@@ -157,13 +166,13 @@ public class FileRecordsTest {
      */
     @Test
     public void testIteratorWithLimits() throws IOException {
-        LogEntry entry = shallowEntries(fileRecords).get(1);
+        RecordBatch batch = batches(fileRecords).get(1);
         int start = fileRecords.searchForOffsetWithSize(1, 0).position;
-        int size = entry.sizeInBytes();
+        int size = batch.sizeInBytes();
         FileRecords slice = fileRecords.read(start, size);
-        assertEquals(Collections.singletonList(entry), shallowEntries(slice));
+        assertEquals(Collections.singletonList(batch), batches(slice));
         FileRecords slice2 = fileRecords.read(start, size - 1);
-        assertEquals(Collections.emptyList(), shallowEntries(slice2));
+        assertEquals(Collections.emptyList(), batches(slice2));
     }
 
     /**
@@ -171,11 +180,11 @@ public class FileRecordsTest {
      */
     @Test
     public void testTruncate() throws IOException {
-        LogEntry entry = shallowEntries(fileRecords).get(0);
+        RecordBatch batch = batches(fileRecords).get(0);
         int end = fileRecords.searchForOffsetWithSize(1, 0).position;
         fileRecords.truncateTo(end);
-        assertEquals(Collections.singletonList(entry), shallowEntries(fileRecords));
-        assertEquals(entry.sizeInBytes(), fileRecords.sizeInBytes());
+        assertEquals(Collections.singletonList(batch), batches(fileRecords));
+        assertEquals(batch.sizeInBytes(), fileRecords.sizeInBytes());
     }
 
     /**
@@ -274,14 +283,14 @@ public class FileRecordsTest {
     @Test
     public void testPreallocateClearShutdown() throws IOException {
         File temp = tempFile();
-        FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
-        set.append(MemoryRecords.withRecords(records));
+        FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+        append(fileRecords, values);
 
-        int oldPosition = (int) set.channel().position();
-        int oldSize = set.sizeInBytes();
-        assertEquals(fileRecords.sizeInBytes(), oldPosition);
-        assertEquals(fileRecords.sizeInBytes(), oldSize);
-        set.close();
+        int oldPosition = (int) fileRecords.channel().position();
+        int oldSize = fileRecords.sizeInBytes();
+        assertEquals(this.fileRecords.sizeInBytes(), oldPosition);
+        assertEquals(this.fileRecords.sizeInBytes(), oldSize);
+        fileRecords.close();
 
         File tempReopen = new File(temp.getAbsolutePath());
         FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 * 1024, true);
@@ -295,104 +304,106 @@ public class FileRecordsTest {
 
     @Test
     public void testFormatConversionWithPartialMessage() throws IOException {
-        LogEntry entry = shallowEntries(fileRecords).get(1);
+        RecordBatch batch = batches(fileRecords).get(1);
         int start = fileRecords.searchForOffsetWithSize(1, 0).position;
-        int size = entry.sizeInBytes();
+        int size = batch.sizeInBytes();
         FileRecords slice = fileRecords.read(start, size - 1);
-        Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
-        assertTrue("No message should be there", shallowEntries(messageV0).isEmpty());
+        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0);
+        assertTrue("No message should be there", batches(messageV0).isEmpty());
         assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
     }
 
     @Test
-    public void testConvertNonCompressedToMagic1() throws IOException {
-        List<LogEntry> entries = Arrays.asList(
-                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
-                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
-        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
-
-        // Up conversion. In reality we only do down conversion, but up conversion should work as well.
-        // up conversion for non-compressed messages
-        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
-            fileRecords.append(records);
-            fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
-            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
-        }
-    }
-
-    @Test
-    public void testConvertCompressedToMagic1() throws IOException {
-        List<LogEntry> entries = Arrays.asList(
-                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
-                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
-        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
-
-        // up conversion for compressed messages
-        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
-            fileRecords.append(records);
-            fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
-            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
-        }
-    }
-
-    @Test
-    public void testConvertNonCompressedToMagic0() throws IOException {
-        List<LogEntry> entries = Arrays.asList(
-                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
-                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
-        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
-
-        // down conversion for non-compressed messages
-        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
-            fileRecords.append(records);
-            fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
-            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
-        }
+    public void testConversion() throws IOException {
+        doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V0);
+        doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V0);
+        doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V1);
+        doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V1);
+        doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V2);
+        doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V2);
     }
 
-    @Test
-    public void testConvertCompressedToMagic0() throws IOException {
-        List<LogEntry> entries = Arrays.asList(
-                LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
-                LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
-        MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
+    private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException {
+        List<Long> offsets = Arrays.asList(0L, 2L, 3L, 9L, 11L, 15L);
+        List<SimpleRecord> records = Arrays.asList(
+                new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
+                new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
+                new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
+                new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
+                new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
+                new SimpleRecord(6L, "k6".getBytes(), "goodbye forever".getBytes()));
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+                TimestampType.CREATE_TIME, 0L);
+        for (int i = 0; i < 2; i++)
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L);
+        for (int i = 2; i < 4; i++)
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
+                TimestampType.CREATE_TIME, 0L);
+        for (int i = 4; i < 6; i++)
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+        builder.close();
+
+        buffer.flip();
 
-        // down conversion for compressed messages
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
-            fileRecords.append(records);
+            fileRecords.append(MemoryRecords.readableRecords(buffer));
             fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
-            verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
+            Records convertedRecords = fileRecords.downConvert(toMagic);
+            verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
         }
     }
 
-    private void verifyConvertedMessageSet(List<LogEntry> initialEntries, Records convertedRecords, byte magicByte) {
+    private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
+                                        List<Long> initialOffsets,
+                                        Records convertedRecords,
+                                        CompressionType compressionType,
+                                        byte magicByte) {
         int i = 0;
-        for (LogEntry logEntry : deepEntries(convertedRecords)) {
-            assertEquals("magic byte should be " + magicByte, magicByte, logEntry.record().magic());
-            assertEquals("offset should not change", initialEntries.get(i).offset(), logEntry.offset());
-            assertEquals("key should not change", initialEntries.get(i).record().key(), logEntry.record().key());
-            assertEquals("payload should not change", initialEntries.get(i).record().value(), logEntry.record().value());
-            i += 1;
+        for (RecordBatch batch : convertedRecords.batches()) {
+            assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
+            assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
+            for (Record record : batch) {
+                assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic()));
+                assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
+                assertEquals("Key should not change", initialRecords.get(i).key(), record.key());
+                assertEquals("Value should not change", initialRecords.get(i).value(), record.value());
+                if (batch.magic() > RecordBatch.MAGIC_VALUE_V0)
+                    assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+                i += 1;
+            }
         }
+        assertEquals(initialOffsets.size(), i);
     }
 
-    private static List<LogEntry> shallowEntries(Records buffer) {
-        return TestUtils.toList(buffer.shallowEntries());
+    private static List<RecordBatch> batches(Records buffer) {
+        return TestUtils.toList(buffer.batches());
     }
 
-    private static List<LogEntry> deepEntries(Records buffer) {
-        return TestUtils.toList(buffer.deepEntries());
+    private FileRecords createFileRecords(byte[][] values) throws IOException {
+        FileRecords fileRecords = FileRecords.open(tempFile());
+        append(fileRecords, values);
+        return fileRecords;
     }
 
-    private FileRecords createFileRecords(Record ... records) throws IOException {
-        FileRecords fileRecords = FileRecords.open(tempFile());
-        fileRecords.append(MemoryRecords.withRecords(records));
+    private void append(FileRecords fileRecords, byte[][] values) throws IOException {
+        long offset = 0L;
+        for (byte[] value : values) {
+            ByteBuffer buffer = ByteBuffer.allocate(128);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
+                    CompressionType.NONE, TimestampType.CREATE_TIME, offset);
+            builder.appendWithOffset(offset++, System.currentTimeMillis(), null, value);
+            fileRecords.append(builder.build());
+        }
         fileRecords.flush();
-        return fileRecords;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java
new file mode 100644
index 0000000..9480c60
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(value = Parameterized.class)
+public class LegacyRecordTest {
+
+    private final byte magic;
+    private final long timestamp;
+    private final ByteBuffer key;
+    private final ByteBuffer value;
+    private final CompressionType compression;
+    private final TimestampType timestampType;
+    private final LegacyRecord record;
+
+    public LegacyRecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) {
+        this.magic = magic;
+        this.timestamp = timestamp;
+        this.timestampType = TimestampType.CREATE_TIME;
+        this.key = key == null ? null : ByteBuffer.wrap(key);
+        this.value = value == null ? null : ByteBuffer.wrap(value);
+        this.compression = compression;
+        this.record = LegacyRecord.create(magic, timestamp, key, value, compression, timestampType);
+    }
+
+    @Test
+    public void testFields() {
+        assertEquals(compression, record.compressionType());
+        assertEquals(key != null, record.hasKey());
+        assertEquals(key, record.key());
+        if (key != null)
+            assertEquals(key.limit(), record.keySize());
+        assertEquals(magic, record.magic());
+        assertEquals(value, record.value());
+        if (value != null)
+            assertEquals(value.limit(), record.valueSize());
+        if (magic > 0) {
+            assertEquals(timestamp, record.timestamp());
+            assertEquals(timestampType, record.timestampType());
+        } else {
+            assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
+            assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+        }
+    }
+
+    @Test
+    public void testChecksum() {
+        assertEquals(record.checksum(), record.computeChecksum());
+
+        byte attributes = LegacyRecord.computeAttributes(magic, this.compression, TimestampType.CREATE_TIME);
+        assertEquals(record.checksum(), LegacyRecord.computeChecksum(
+                magic,
+                attributes,
+                this.timestamp,
+                this.key == null ? null : this.key.array(),
+                this.value == null ? null : this.value.array()
+        ));
+        assertTrue(record.isValid());
+        for (int i = LegacyRecord.CRC_OFFSET + LegacyRecord.CRC_LENGTH; i < record.sizeInBytes(); i++) {
+            LegacyRecord copy = copyOf(record);
+            copy.buffer().put(i, (byte) 69);
+            assertFalse(copy.isValid());
+            try {
+                copy.ensureValid();
+                fail("Should fail the above test.");
+            } catch (InvalidRecordException e) {
+                // this is good
+            }
+        }
+    }
+
+    private LegacyRecord copyOf(LegacyRecord record) {
+        ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes());
+        record.buffer().put(buffer);
+        buffer.rewind();
+        record.buffer().rewind();
+        return new LegacyRecord(buffer);
+    }
+
+    @Test
+    public void testEquality() {
+        assertEquals(record, copyOf(record));
+    }
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        byte[] payload = new byte[1000];
+        Arrays.fill(payload, (byte) 1);
+        List<Object[]> values = new ArrayList<>();
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1))
+            for (long timestamp : Arrays.asList(RecordBatch.NO_TIMESTAMP, 0L, 1L))
+                for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
+                    for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
+                        for (CompressionType compression : CompressionType.values())
+                            values.add(new Object[] {magic, timestamp, key, value, compression});
+        return values;
+    }
+
+}


Mime
View raw message