http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8a80790..0dea6b6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
@@ -30,7 +32,11 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
@@ -39,6 +45,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -60,12 +67,14 @@ public class SenderTest {
private static final int MAX_BLOCK_TIMEOUT = 1000;
private static final int REQUEST_TIMEOUT = 1000;
- private TopicPartition tp = new TopicPartition("test", 0);
+ private TopicPartition tp0 = new TopicPartition("test", 0);
+ private TopicPartition tp1 = new TopicPartition("test", 1);
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private int batchSize = 16 * 1024;
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners());
- private Cluster cluster = TestUtils.singletonCluster("test", 1);
+ private ApiVersions apiVersions = new ApiVersions();
+ private Cluster cluster = TestUtils.singletonCluster("test", 2);
private Metrics metrics = null;
private RecordAccumulator accumulator = null;
private Sender sender = null;
@@ -76,7 +85,7 @@ public class SenderTest {
metricTags.put("client-id", CLIENT_ID);
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
metrics = new Metrics(metricConfig, time);
- accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time);
+ accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions);
sender = new Sender(client,
metadata,
this.accumulator,
@@ -86,7 +95,8 @@ public class SenderTest {
MAX_RETRIES,
metrics,
time,
- REQUEST_TIMEOUT);
+ REQUEST_TIMEOUT,
+ apiVersions);
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
}
@@ -99,12 +109,12 @@ public class SenderTest {
@Test
public void testSimple() throws Exception {
long offset = 0;
- Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
- client.respond(produceResponse(tp, offset, Errors.NONE, 0));
+ client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
sender.run(time.milliseconds());
assertEquals("All requests completed.", 0, client.inFlightRequestCount());
assertFalse(client.hasInFlightRequests());
@@ -113,6 +123,101 @@ public class SenderTest {
assertEquals(offset, future.get().offset());
}
+ @Test
+ public void testMessageFormatDownConversion() throws Exception {
+ // this test case verifies the behavior when the version of the produce request supported by the
+ // broker changes after the record set is created
+
+ long offset = 0;
+
+ // start off support produce request v3
+ apiVersions.update("0", NodeApiVersions.create());
+
+ Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
+ null, MAX_BLOCK_TIMEOUT).future;
+
+ // now the partition leader supports only v2
+ apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
+ new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ ProduceRequest request = (ProduceRequest) body;
+ if (request.version() != 2)
+ return false;
+
+ MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
+ return records != null &&
+ records.sizeInBytes() > 0 &&
+ records.hasMatchingMagic(RecordBatch.MAGIC_VALUE_V1);
+ }
+ }, produceResponse(tp0, offset, Errors.NONE, 0));
+
+ sender.run(time.milliseconds()); // connect
+ sender.run(time.milliseconds()); // send produce request
+
+ assertTrue("Request should be completed", future.isDone());
+ assertEquals(offset, future.get().offset());
+ }
+
+ @Test
+ public void testDownConversionForMismatchedMagicValues() throws Exception {
+ // it can happen that we construct a record set with mismatching magic values (perhaps
+ // because the partition leader changed after the record set was initially constructed)
+ // in this case, we down-convert record sets with newer magic values to match the oldest
+ // created record set
+
+ long offset = 0;
+
+ // start off support produce request v3
+ apiVersions.update("0", NodeApiVersions.create());
+
+ Future<RecordMetadata> future1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
+ null, MAX_BLOCK_TIMEOUT).future;
+
+ // now the partition leader supports only v2
+ apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
+ new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+
+ Future<RecordMetadata> future2 = accumulator.append(tp1, 0L, "key".getBytes(), "value".getBytes(),
+ null, MAX_BLOCK_TIMEOUT).future;
+
+ // start off support produce request v3
+ apiVersions.update("0", NodeApiVersions.create());
+
+ ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, RecordBatch.NO_TIMESTAMP);
+ Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<>();
+ partResp.put(tp0, resp);
+ partResp.put(tp1, resp);
+ ProduceResponse produceResponse = new ProduceResponse(partResp, 0);
+
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ ProduceRequest request = (ProduceRequest) body;
+ if (request.version() != 2)
+ return false;
+
+ Map<TopicPartition, MemoryRecords> recordsMap = request.partitionRecordsOrFail();
+ if (recordsMap.size() != 2)
+ return false;
+
+ for (MemoryRecords records : recordsMap.values()) {
+ if (records == null || records.sizeInBytes() == 0 || !records.hasMatchingMagic(RecordBatch.MAGIC_VALUE_V1))
+ return false;
+ }
+ return true;
+ }
+ }, produceResponse);
+
+ sender.run(time.milliseconds()); // connect
+ sender.run(time.milliseconds()); // send produce request
+
+ assertTrue("Request should be completed", future1.isDone());
+ assertTrue("Request should be completed", future2.isDone());
+ }
+
/*
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
@@ -120,9 +225,9 @@ public class SenderTest {
public void testQuotaMetrics() throws Exception {
final long offset = 0;
for (int i = 1; i <= 3; i++) {
- accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT);
+ accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT);
sender.run(time.milliseconds()); // send produce request
- client.respond(produceResponse(tp, offset, Errors.NONE, 100 * i));
+ client.respond(produceResponse(tp0, offset, Errors.NONE, 100 * i));
sender.run(time.milliseconds());
}
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
@@ -147,9 +252,10 @@ public class SenderTest {
maxRetries,
m,
time,
- REQUEST_TIMEOUT);
+ REQUEST_TIMEOUT,
+ apiVersions);
// do a successful retry
- Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
String id = client.requests().peek().destination();
@@ -167,13 +273,13 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
long offset = 0;
- client.respond(produceResponse(tp, offset, Errors.NONE, 0));
+ client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
sender.run(time.milliseconds());
assertTrue("Request should have retried and completed", future.isDone());
assertEquals(offset, future.get().offset());
// do an unsuccessful retry
- future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send produce request
for (int i = 0; i < maxRetries + 1; i++) {
client.disconnect(client.requests().peek().destination());
@@ -202,7 +308,8 @@ public class SenderTest {
maxRetries,
m,
time,
- REQUEST_TIMEOUT);
+ REQUEST_TIMEOUT,
+ apiVersions);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
@@ -245,28 +352,28 @@ public class SenderTest {
long offset = 0;
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
- Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
- assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+ assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
sender.run(time.milliseconds()); // send produce request
- client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
+ client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
sender.run(time.milliseconds());
assertEquals("Request completed.", 0, client.inFlightRequestCount());
assertFalse(client.hasInFlightRequests());
sender.run(time.milliseconds());
assertTrue("Request should be completed", future.isDone());
- assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic()));
+ assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp0.topic()));
time.sleep(Metadata.TOPIC_EXPIRY_MS);
metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
- assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic()));
- future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
+ future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
- assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+ assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
sender.run(time.milliseconds()); // send produce request
- client.respond(produceResponse(tp, offset++, Errors.NONE, 0));
+ client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
sender.run(time.milliseconds());
assertEquals("Request completed.", 0, client.inFlightRequestCount());
assertFalse(client.hasInFlightRequests());
@@ -285,7 +392,7 @@ public class SenderTest {
}
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
- ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, Record.NO_TIMESTAMP);
+ ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
return new ProduceResponse(partResp, throttleTimeMs);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
new file mode 100644
index 0000000..0f01f2a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.record.AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AbstractLegacyRecordBatchTest {
+
+ @Test
+ public void testSetLastOffsetCompressed() {
+ SimpleRecord[] simpleRecords = new SimpleRecord[] {
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+ };
+
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords);
+
+ long lastOffset = 500L;
+ long firstOffset = lastOffset - simpleRecords.length + 1;
+
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setLastOffset(lastOffset);
+ assertEquals(lastOffset, batch.lastOffset());
+ assertEquals(firstOffset, batch.baseOffset());
+ assertTrue(batch.isValid());
+
+ List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+ assertEquals(1, recordBatches.size());
+ assertEquals(lastOffset, recordBatches.get(0).lastOffset());
+
+ long offset = firstOffset;
+ for (Record record : records.records())
+ assertEquals(offset++, record.offset());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetNoTimestampTypeNotAllowed() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setMaxTimestamp(TimestampType.NO_TIMESTAMP_TYPE, RecordBatch.NO_TIMESTAMP);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetLogAppendTimeNotAllowedV0() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+ long logAppendTime = 15L;
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, logAppendTime);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetCreateTimeNotAllowedV0() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+ long createTime = 15L;
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setMaxTimestamp(TimestampType.CREATE_TIME, createTime);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetPartitionLeaderEpochNotAllowedV0() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setPartitionLeaderEpoch(15);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSetPartitionLeaderEpochNotAllowedV1() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setPartitionLeaderEpoch(15);
+ }
+
+ @Test
+ public void testSetLogAppendTimeV1() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+ long logAppendTime = 15L;
+
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, logAppendTime);
+ assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+ assertEquals(logAppendTime, batch.maxTimestamp());
+ assertTrue(batch.isValid());
+
+ List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+ assertEquals(1, recordBatches.size());
+ assertEquals(TimestampType.LOG_APPEND_TIME, recordBatches.get(0).timestampType());
+ assertEquals(logAppendTime, recordBatches.get(0).maxTimestamp());
+
+ for (Record record : records.records())
+ assertEquals(logAppendTime, record.timestamp());
+ }
+
+ @Test
+ public void testSetCreateTimeV1() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+ long createTime = 15L;
+
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setMaxTimestamp(TimestampType.CREATE_TIME, createTime);
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+ assertEquals(createTime, batch.maxTimestamp());
+ assertTrue(batch.isValid());
+
+ List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+ assertEquals(1, recordBatches.size());
+ assertEquals(TimestampType.CREATE_TIME, recordBatches.get(0).timestampType());
+ assertEquals(createTime, recordBatches.get(0).maxTimestamp());
+
+ long expectedTimestamp = 1L;
+ for (Record record : records.records())
+ assertEquals(expectedTimestamp++, record.timestamp());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
index 6475009..3745006 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
@@ -16,95 +16,109 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.errors.CorruptRecordException;
import org.junit.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ByteBufferLogInputStreamTest {
@Test
public void iteratorIgnoresIncompleteEntries() {
- ByteBuffer buffer = ByteBuffer.allocate(2048);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(15L, "a".getBytes(), "1".getBytes());
builder.append(20L, "b".getBytes(), "2".getBytes());
+ builder.close();
- ByteBuffer recordsBuffer = builder.build().buffer();
- recordsBuffer.limit(recordsBuffer.limit() - 5);
+ builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+ builder.append(30L, "c".getBytes(), "3".getBytes());
+ builder.append(40L, "d".getBytes(), "4".getBytes());
+ builder.close();
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = MemoryRecords.readableRecords(recordsBuffer).shallowEntries().iterator();
+ buffer.flip();
+ buffer.limit(buffer.limit() - 5);
+
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ Iterator<MutableRecordBatch> iterator = records.batches().iterator();
assertTrue(iterator.hasNext());
- ByteBufferLogInputStream.ByteBufferLogEntry first = iterator.next();
- assertEquals(0L, first.offset());
+ MutableRecordBatch first = iterator.next();
+ assertEquals(1L, first.lastOffset());
assertFalse(iterator.hasNext());
}
- @Test
- public void testSetCreateTimeV1() {
- ByteBuffer buffer = ByteBuffer.allocate(2048);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ @Test(expected = CorruptRecordException.class)
+ public void iteratorRaisesOnTooSmallRecords() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
-
- assertTrue(iterator.hasNext());
- ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
-
- long createTimeMs = 20L;
- entry.setCreateTime(createTimeMs);
+ builder.append(20L, "b".getBytes(), "2".getBytes());
+ builder.close();
- assertEquals(TimestampType.CREATE_TIME, entry.record().timestampType());
- assertEquals(createTimeMs, entry.record().timestamp());
- }
+ int position = buffer.position();
- @Test(expected = IllegalArgumentException.class)
- public void testSetCreateTimeNotAllowedV0() {
- ByteBuffer buffer = ByteBuffer.allocate(2048);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
- builder.append(15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
+ builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+ builder.append(30L, "c".getBytes(), "3".getBytes());
+ builder.append(40L, "d".getBytes(), "4".getBytes());
+ builder.close();
- assertTrue(iterator.hasNext());
- ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+ buffer.flip();
+ buffer.putInt(position + DefaultRecordBatch.LENGTH_OFFSET, 9);
- long createTimeMs = 20L;
- entry.setCreateTime(createTimeMs);
+ ByteBufferLogInputStream logInputStream = new ByteBufferLogInputStream(buffer, Integer.MAX_VALUE);
+ assertNotNull(logInputStream.nextBatch());
+ logInputStream.nextBatch();
}
- @Test
- public void testSetLogAppendTimeV1() {
- ByteBuffer buffer = ByteBuffer.allocate(2048);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ @Test(expected = CorruptRecordException.class)
+ public void iteratorRaisesOnInvalidMagic() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
+ builder.append(20L, "b".getBytes(), "2".getBytes());
+ builder.close();
- assertTrue(iterator.hasNext());
- ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+ int position = buffer.position();
- long logAppendTime = 20L;
- entry.setLogAppendTime(logAppendTime);
+ builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+ builder.append(30L, "c".getBytes(), "3".getBytes());
+ builder.append(40L, "d".getBytes(), "4".getBytes());
+ builder.close();
- assertEquals(TimestampType.LOG_APPEND_TIME, entry.record().timestampType());
- assertEquals(logAppendTime, entry.record().timestamp());
+ buffer.flip();
+ buffer.put(position + DefaultRecordBatch.MAGIC_OFFSET, (byte) 37);
+
+ ByteBufferLogInputStream logInputStream = new ByteBufferLogInputStream(buffer, Integer.MAX_VALUE);
+ assertNotNull(logInputStream.nextBatch());
+ logInputStream.nextBatch();
}
- @Test(expected = IllegalArgumentException.class)
- public void testSetLogAppendTimeNotAllowedV0() {
- ByteBuffer buffer = ByteBuffer.allocate(2048);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ @Test(expected = CorruptRecordException.class)
+ public void iteratorRaisesOnTooLargeRecords() throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
builder.append(15L, "a".getBytes(), "1".getBytes());
- Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator();
+ builder.append(20L, "b".getBytes(), "2".getBytes());
+ builder.close();
- assertTrue(iterator.hasNext());
- ByteBufferLogInputStream.ByteBufferLogEntry entry = iterator.next();
+ builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 2L);
+ builder.append(30L, "c".getBytes(), "3".getBytes());
+ builder.append(40L, "d".getBytes(), "4".getBytes());
+ builder.close();
+
+ buffer.flip();
- long logAppendTime = 20L;
- entry.setLogAppendTime(logAppendTime);
+ ByteBufferLogInputStream logInputStream = new ByteBufferLogInputStream(buffer, 25);
+ assertNotNull(logInputStream.nextBatch());
+ logInputStream.nextBatch();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index 21cbfb7..98dc591 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -31,13 +31,13 @@ public class CompressionTypeTest {
public void testLZ4FramingMagicV0() {
ByteBuffer buffer = ByteBuffer.allocate(256);
KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput(
- new ByteBufferOutputStream(buffer), Record.MAGIC_VALUE_V0, 256);
+ new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V0, 256);
assertTrue(out.useBrokenFlagDescriptorChecksum());
buffer.rewind();
KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
- new ByteBufferInputStream(buffer), Record.MAGIC_VALUE_V0);
+ new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V0);
assertTrue(in.ignoreFlagDescriptorChecksum());
}
@@ -45,13 +45,13 @@ public class CompressionTypeTest {
public void testLZ4FramingMagicV1() {
ByteBuffer buffer = ByteBuffer.allocate(256);
KafkaLZ4BlockOutputStream out = (KafkaLZ4BlockOutputStream) CompressionType.LZ4.wrapForOutput(
- new ByteBufferOutputStream(buffer), Record.MAGIC_VALUE_V1, 256);
+ new ByteBufferOutputStream(buffer), RecordBatch.MAGIC_VALUE_V1, 256);
assertFalse(out.useBrokenFlagDescriptorChecksum());
buffer.rewind();
KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
- new ByteBufferInputStream(buffer), Record.MAGIC_VALUE_V1);
+ new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V1);
assertFalse(in.ignoreFlagDescriptorChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
new file mode 100644
index 0000000..7c0faa5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class ControlRecordTypeTest {
+
+ @Test
+ public void testParseUnknownType() {
+ ByteBuffer buffer = ByteBuffer.allocate(32);
+ buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
+ buffer.putShort((short) 337);
+ buffer.flip();
+ ControlRecordType type = ControlRecordType.parse(buffer);
+ assertEquals(ControlRecordType.UNKNOWN, type);
+ }
+
+ @Test
+ public void testParseUnknownVersion() {
+ ByteBuffer buffer = ByteBuffer.allocate(32);
+ buffer.putShort((short) 5);
+ buffer.putShort(ControlRecordType.ABORT.type);
+ buffer.putInt(23432); // some field added in version 5
+ buffer.flip();
+ ControlRecordType type = ControlRecordType.parse(buffer);
+ assertEquals(ControlRecordType.ABORT, type);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
new file mode 100644
index 0000000..b02c5c9
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DefaultRecordBatchTest {
+
+ @Test
+ public void buildDefaultRecordBatch() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
+ TimestampType.CREATE_TIME, 1234567L);
+ builder.appendWithOffset(1234567, System.currentTimeMillis(), "a".getBytes(), "v".getBytes());
+ builder.appendWithOffset(1234568, System.currentTimeMillis(), "b".getBytes(), "v".getBytes());
+
+ MemoryRecords records = builder.build();
+ for (MutableRecordBatch batch : records.batches()) {
+ assertEquals(1234567, batch.baseOffset());
+ assertEquals(1234568, batch.lastOffset());
+ assertTrue(batch.isValid());
+
+ for (Record record : batch) {
+ assertTrue(record.isValid());
+ }
+ }
+ }
+
+ @Test
+ public void testSizeInBytes() {
+ Header[] headers = new Header[] {
+ new Header("foo", "value".getBytes()),
+ new Header("bar", Utils.wrapNullable(null))
+ };
+
+ long timestamp = System.currentTimeMillis();
+ SimpleRecord[] records = new SimpleRecord[] {
+ new SimpleRecord(timestamp, "key".getBytes(), "value".getBytes()),
+ new SimpleRecord(timestamp + 30000, null, "value".getBytes()),
+ new SimpleRecord(timestamp + 60000, "key".getBytes(), null),
+ new SimpleRecord(timestamp + 60000, "key".getBytes(), "value".getBytes(), headers)
+ };
+ int actualSize = MemoryRecords.withRecords(CompressionType.NONE, records).sizeInBytes();
+ assertEquals(actualSize, DefaultRecordBatch.sizeInBytes(Arrays.asList(records)));
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testInvalidRecordSize() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+ CompressionType.NONE, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+ ByteBuffer buffer = records.buffer();
+ buffer.putInt(DefaultRecordBatch.LENGTH_OFFSET, 10);
+
+ DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
+ assertFalse(batch.isValid());
+ batch.ensureValid();
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testInvalidCrc() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+ CompressionType.NONE, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+ ByteBuffer buffer = records.buffer();
+ buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, 23);
+
+ DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
+ assertFalse(batch.isValid());
+ batch.ensureValid();
+ }
+
+ @Test
+ public void testSetLastOffset() {
+ SimpleRecord[] simpleRecords = new SimpleRecord[] {
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+ };
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+ CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecords);
+
+ long lastOffset = 500L;
+ long firstOffset = lastOffset - simpleRecords.length + 1;
+
+ DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+ batch.setLastOffset(lastOffset);
+ assertEquals(lastOffset, batch.lastOffset());
+ assertEquals(firstOffset, batch.baseOffset());
+ assertTrue(batch.isValid());
+
+ List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+ assertEquals(1, recordBatches.size());
+ assertEquals(lastOffset, recordBatches.get(0).lastOffset());
+
+ long offset = firstOffset;
+ for (Record record : records.records())
+ assertEquals(offset++, record.offset());
+ }
+
+ @Test
+ public void testSetPartitionLeaderEpoch() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+ CompressionType.NONE, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+ int leaderEpoch = 500;
+
+ DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+ batch.setPartitionLeaderEpoch(leaderEpoch);
+ assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
+ assertTrue(batch.isValid());
+
+ List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+ assertEquals(1, recordBatches.size());
+ assertEquals(leaderEpoch, recordBatches.get(0).partitionLeaderEpoch());
+ }
+
+ @Test
+ public void testSetLogAppendTime() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+ CompressionType.NONE, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+
+ long logAppendTime = 15L;
+
+ DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+ batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, logAppendTime);
+ assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+ assertEquals(logAppendTime, batch.maxTimestamp());
+ assertTrue(batch.isValid());
+
+ List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
+ assertEquals(1, recordBatches.size());
+ assertEquals(logAppendTime, recordBatches.get(0).maxTimestamp());
+ assertEquals(TimestampType.LOG_APPEND_TIME, recordBatches.get(0).timestampType());
+
+ for (Record record : records.records())
+ assertEquals(logAppendTime, record.timestamp());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetNoTimestampTypeNotAllowed() {
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
+ CompressionType.NONE, TimestampType.CREATE_TIME,
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
+ DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
+ batch.setMaxTimestamp(TimestampType.NO_TIMESTAMP_TYPE, RecordBatch.NO_TIMESTAMP);
+ }
+
+ @Test
+ public void testReadAndWriteControlRecord() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+
+ builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.COMMIT, null);
+ builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.ABORT, null);
+ MemoryRecords records = builder.build();
+
+ List<Record> logRecords = TestUtils.toList(records.records());
+ assertEquals(2, logRecords.size());
+
+ Record commitRecord = logRecords.get(0);
+ assertTrue(commitRecord.isControlRecord());
+ assertEquals(ControlRecordType.COMMIT, ControlRecordType.parse(commitRecord.key()));
+
+ Record abortRecord = logRecords.get(1);
+ assertTrue(abortRecord.isControlRecord());
+ assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
new file mode 100644
index 0000000..8502475
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultRecordTest {
+
+ @Test
+ public void testBasicSerde() {
+ Header[] headers = new Header[] {
+ new Header("foo", "value".getBytes()),
+ new Header("bar", Utils.wrapNullable(null)),
+ new Header("\"A\\u00ea\\u00f1\\u00fcC\"", "value".getBytes())
+ };
+
+ SimpleRecord[] records = new SimpleRecord[] {
+ new SimpleRecord("hi".getBytes(), "there".getBytes()),
+ new SimpleRecord(null, "there".getBytes()),
+ new SimpleRecord("hi".getBytes(), null),
+ new SimpleRecord(null, null),
+ new SimpleRecord(15L, "hi".getBytes(), "there".getBytes(), headers)
+ };
+
+ for (boolean isControlRecord : Arrays.asList(true, false)) {
+ for (SimpleRecord record : records) {
+ int baseSequence = 723;
+ long baseOffset = 37;
+ int offsetDelta = 10;
+ long baseTimestamp = System.currentTimeMillis();
+ long timestampDelta = 323;
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ DefaultRecord.writeTo(buffer, isControlRecord, offsetDelta, timestampDelta, record.key(),
+ record.value(), record.headers());
+ buffer.flip();
+
+ DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
+ assertNotNull(logRecord);
+ assertEquals(baseOffset + offsetDelta, logRecord.offset());
+ assertEquals(baseSequence + offsetDelta, logRecord.sequence());
+ assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
+ assertEquals(record.key(), logRecord.key());
+ assertEquals(record.value(), logRecord.value());
+ assertEquals(isControlRecord, logRecord.isControlRecord());
+ assertArrayEquals(record.headers(), logRecord.headers());
+ assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
+ record.headers()), logRecord.sizeInBytes());
+ }
+ }
+ }
+
+ @Test
+ public void testSerdeNoSequence() {
+ ByteBuffer key = ByteBuffer.wrap("hi".getBytes());
+ ByteBuffer value = ByteBuffer.wrap("there".getBytes());
+ long baseOffset = 37;
+ int offsetDelta = 10;
+ long baseTimestamp = System.currentTimeMillis();
+ long timestampDelta = 323;
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ DefaultRecord.writeTo(buffer, false, offsetDelta, timestampDelta, key, value, new Header[0]);
+ buffer.flip();
+
+ DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null);
+ assertNotNull(record);
+ assertEquals(RecordBatch.NO_SEQUENCE, record.sequence());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
new file mode 100644
index 0000000..65de01c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FileLogInputStreamTest {
+
+ @Test
+ public void testWriteTo() throws IOException {
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("foo".getBytes()),
+ new SimpleRecord("bar".getBytes())));
+ fileRecords.flush();
+
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), Integer.MAX_VALUE, 0,
+ fileRecords.sizeInBytes());
+
+ FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch();
+ assertNotNull(batch);
+ assertEquals(RecordBatch.MAGIC_VALUE_V2, batch.magic());
+
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ batch.writeTo(buffer);
+ buffer.flip();
+
+ MemoryRecords memRecords = MemoryRecords.readableRecords(buffer);
+ List<Record> records = Utils.toList(memRecords.records().iterator());
+ assertEquals(2, records.size());
+ Record record0 = records.get(0);
+ assertTrue(record0.hasMagic(RecordBatch.MAGIC_VALUE_V2));
+ assertEquals("foo", Utils.utf8(record0.value(), record0.valueSize()));
+ Record record1 = records.get(1);
+ assertTrue(record1.hasMagic(RecordBatch.MAGIC_VALUE_V2));
+ assertEquals("bar", Utils.utf8(record1.value(), record1.valueSize()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 7a4dc1e..7e780dd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import static org.apache.kafka.test.TestUtils.tempFile;
@@ -37,16 +38,16 @@ import static org.junit.Assert.fail;
public class FileRecordsTest {
- private Record[] records = new Record[] {
- Record.create("abcd".getBytes()),
- Record.create("efgh".getBytes()),
- Record.create("ijkl".getBytes())
+ private byte[][] values = new byte[][] {
+ "abcd".getBytes(),
+ "efgh".getBytes(),
+ "ijkl".getBytes()
};
private FileRecords fileRecords;
@Before
public void setup() throws IOException {
- this.fileRecords = createFileRecords(records);
+ this.fileRecords = createFileRecords(values);
}
/**
@@ -56,7 +57,7 @@ public class FileRecordsTest {
public void testFileSize() throws IOException {
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
for (int i = 0; i < 20; i++) {
- fileRecords.append(MemoryRecords.withRecords(Record.create("abcd".getBytes())));
+ fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("abcd".getBytes())));
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
}
}
@@ -83,7 +84,11 @@ public class FileRecordsTest {
fileRecords.channel().write(buffer);
// appending those bytes should not change the contents
- TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
+ Iterator<Record> records = fileRecords.records().iterator();
+ for (byte[] value : values) {
+ assertTrue(records.hasNext());
+ assertEquals(records.next().value(), ByteBuffer.wrap(value));
+ }
}
/**
@@ -92,7 +97,11 @@ public class FileRecordsTest {
@Test
public void testIterationDoesntChangePosition() throws IOException {
long position = fileRecords.channel().position();
- TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
+ Iterator<Record> records = fileRecords.records().iterator();
+ for (byte[] value : values) {
+ assertTrue(records.hasNext());
+ assertEquals(records.next().value(), ByteBuffer.wrap(value));
+ }
assertEquals(position, fileRecords.channel().position());
}
@@ -102,18 +111,18 @@ public class FileRecordsTest {
@Test
public void testRead() throws IOException {
FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
- TestUtils.checkEquals(fileRecords.shallowEntries(), read.shallowEntries());
+ TestUtils.checkEquals(fileRecords.batches(), read.batches());
- List<LogEntry> items = shallowEntries(read);
- LogEntry second = items.get(1);
+ List<RecordBatch> items = batches(read);
+ RecordBatch second = items.get(1);
read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
assertEquals("Try a read starting from the second message",
- items.subList(1, 3), shallowEntries(read));
+ items.subList(1, 3), batches(read));
read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
assertEquals("Try a read of a single message starting from the second message",
- Collections.singletonList(second), shallowEntries(read));
+ Collections.singletonList(second), batches(read));
}
/**
@@ -122,28 +131,28 @@ public class FileRecordsTest {
@Test
public void testSearch() throws IOException {
// append a new message with a high offset
- Record lastMessage = Record.create("test".getBytes());
- fileRecords.append(MemoryRecords.withRecords(50L, lastMessage));
+ SimpleRecord lastMessage = new SimpleRecord("test".getBytes());
+ fileRecords.append(MemoryRecords.withRecords(50L, CompressionType.NONE, lastMessage));
- List<LogEntry> entries = shallowEntries(fileRecords);
+ List<RecordBatch> batches = batches(fileRecords);
int position = 0;
- int message1Size = entries.get(0).sizeInBytes();
+ int message1Size = batches.get(0).sizeInBytes();
assertEquals("Should be able to find the first message by its offset",
new FileRecords.LogEntryPosition(0L, position, message1Size),
fileRecords.searchForOffsetWithSize(0, 0));
position += message1Size;
- int message2Size = entries.get(1).sizeInBytes();
+ int message2Size = batches.get(1).sizeInBytes();
assertEquals("Should be able to find second message when starting from 0",
new FileRecords.LogEntryPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, 0));
assertEquals("Should be able to find second message starting from its offset",
new FileRecords.LogEntryPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, position));
- position += message2Size + entries.get(2).sizeInBytes();
+ position += message2Size + batches.get(2).sizeInBytes();
- int message4Size = entries.get(3).sizeInBytes();
+ int message4Size = batches.get(3).sizeInBytes();
assertEquals("Should be able to find fourth message from a non-existant offset",
new FileRecords.LogEntryPosition(50L, position, message4Size),
fileRecords.searchForOffsetWithSize(3, position));
@@ -157,13 +166,13 @@ public class FileRecordsTest {
*/
@Test
public void testIteratorWithLimits() throws IOException {
- LogEntry entry = shallowEntries(fileRecords).get(1);
+ RecordBatch batch = batches(fileRecords).get(1);
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
- int size = entry.sizeInBytes();
+ int size = batch.sizeInBytes();
FileRecords slice = fileRecords.read(start, size);
- assertEquals(Collections.singletonList(entry), shallowEntries(slice));
+ assertEquals(Collections.singletonList(batch), batches(slice));
FileRecords slice2 = fileRecords.read(start, size - 1);
- assertEquals(Collections.emptyList(), shallowEntries(slice2));
+ assertEquals(Collections.emptyList(), batches(slice2));
}
/**
@@ -171,11 +180,11 @@ public class FileRecordsTest {
*/
@Test
public void testTruncate() throws IOException {
- LogEntry entry = shallowEntries(fileRecords).get(0);
+ RecordBatch batch = batches(fileRecords).get(0);
int end = fileRecords.searchForOffsetWithSize(1, 0).position;
fileRecords.truncateTo(end);
- assertEquals(Collections.singletonList(entry), shallowEntries(fileRecords));
- assertEquals(entry.sizeInBytes(), fileRecords.sizeInBytes());
+ assertEquals(Collections.singletonList(batch), batches(fileRecords));
+ assertEquals(batch.sizeInBytes(), fileRecords.sizeInBytes());
}
/**
@@ -274,14 +283,14 @@ public class FileRecordsTest {
@Test
public void testPreallocateClearShutdown() throws IOException {
File temp = tempFile();
- FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
- set.append(MemoryRecords.withRecords(records));
+ FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
+ append(fileRecords, values);
- int oldPosition = (int) set.channel().position();
- int oldSize = set.sizeInBytes();
- assertEquals(fileRecords.sizeInBytes(), oldPosition);
- assertEquals(fileRecords.sizeInBytes(), oldSize);
- set.close();
+ int oldPosition = (int) fileRecords.channel().position();
+ int oldSize = fileRecords.sizeInBytes();
+ assertEquals(this.fileRecords.sizeInBytes(), oldPosition);
+ assertEquals(this.fileRecords.sizeInBytes(), oldSize);
+ fileRecords.close();
File tempReopen = new File(temp.getAbsolutePath());
FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 * 1024, true);
@@ -295,104 +304,106 @@ public class FileRecordsTest {
@Test
public void testFormatConversionWithPartialMessage() throws IOException {
- LogEntry entry = shallowEntries(fileRecords).get(1);
+ RecordBatch batch = batches(fileRecords).get(1);
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
- int size = entry.sizeInBytes();
+ int size = batch.sizeInBytes();
FileRecords slice = fileRecords.read(start, size - 1);
- Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
- assertTrue("No message should be there", shallowEntries(messageV0).isEmpty());
+ Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0);
+ assertTrue("No message should be there", batches(messageV0).isEmpty());
assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
}
@Test
- public void testConvertNonCompressedToMagic1() throws IOException {
- List<LogEntry> entries = Arrays.asList(
- LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
- LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
- MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
-
- // Up conversion. In reality we only do down conversion, but up conversion should work as well.
- // up conversion for non-compressed messages
- try (FileRecords fileRecords = FileRecords.open(tempFile())) {
- fileRecords.append(records);
- fileRecords.flush();
- Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
- verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
- }
- }
-
- @Test
- public void testConvertCompressedToMagic1() throws IOException {
- List<LogEntry> entries = Arrays.asList(
- LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
- LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
- MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
-
- // up conversion for compressed messages
- try (FileRecords fileRecords = FileRecords.open(tempFile())) {
- fileRecords.append(records);
- fileRecords.flush();
- Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
- verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
- }
- }
-
- @Test
- public void testConvertNonCompressedToMagic0() throws IOException {
- List<LogEntry> entries = Arrays.asList(
- LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
- LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
- MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
-
- // down conversion for non-compressed messages
- try (FileRecords fileRecords = FileRecords.open(tempFile())) {
- fileRecords.append(records);
- fileRecords.flush();
- Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
- verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
- }
+ public void testConversion() throws IOException {
+ doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V0);
+ doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V0);
+ doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V1);
+ doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V1);
+ doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V2);
+ doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V2);
}
- @Test
- public void testConvertCompressedToMagic0() throws IOException {
- List<LogEntry> entries = Arrays.asList(
- LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
- LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
- MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
+ private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException {
+ List<Long> offsets = Arrays.asList(0L, 2L, 3L, 9L, 11L, 15L);
+ List<SimpleRecord> records = Arrays.asList(
+ new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
+ new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
+ new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
+ new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
+ new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
+ new SimpleRecord(6L, "k6".getBytes(), "goodbye forever".getBytes()));
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+ TimestampType.CREATE_TIME, 0L);
+ for (int i = 0; i < 2; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L);
+ for (int i = 2; i < 4; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
+ TimestampType.CREATE_TIME, 0L);
+ for (int i = 4; i < 6; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ buffer.flip();
- // down conversion for compressed messages
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
- fileRecords.append(records);
+ fileRecords.append(MemoryRecords.readableRecords(buffer));
fileRecords.flush();
- Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
- verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
+ Records convertedRecords = fileRecords.downConvert(toMagic);
+ verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
}
}
- private void verifyConvertedMessageSet(List<LogEntry> initialEntries, Records convertedRecords, byte magicByte) {
+ private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
+ List<Long> initialOffsets,
+ Records convertedRecords,
+ CompressionType compressionType,
+ byte magicByte) {
int i = 0;
- for (LogEntry logEntry : deepEntries(convertedRecords)) {
- assertEquals("magic byte should be " + magicByte, magicByte, logEntry.record().magic());
- assertEquals("offset should not change", initialEntries.get(i).offset(), logEntry.offset());
- assertEquals("key should not change", initialEntries.get(i).record().key(), logEntry.record().key());
- assertEquals("payload should not change", initialEntries.get(i).record().value(), logEntry.record().value());
- i += 1;
+ for (RecordBatch batch : convertedRecords.batches()) {
+ assertTrue("Magic byte should be lower than or equal to " + magicByte, batch.magic() <= magicByte);
+ assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
+ for (Record record : batch) {
+ assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic()));
+ assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
+ assertEquals("Key should not change", initialRecords.get(i).key(), record.key());
+ assertEquals("Value should not change", initialRecords.get(i).value(), record.value());
+ if (batch.magic() > RecordBatch.MAGIC_VALUE_V0)
+ assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
+ i += 1;
+ }
}
+ assertEquals(initialOffsets.size(), i);
}
- private static List<LogEntry> shallowEntries(Records buffer) {
- return TestUtils.toList(buffer.shallowEntries());
+ private static List<RecordBatch> batches(Records buffer) {
+ return TestUtils.toList(buffer.batches());
}
- private static List<LogEntry> deepEntries(Records buffer) {
- return TestUtils.toList(buffer.deepEntries());
+ private FileRecords createFileRecords(byte[][] values) throws IOException {
+ FileRecords fileRecords = FileRecords.open(tempFile());
+ append(fileRecords, values);
+ return fileRecords;
}
- private FileRecords createFileRecords(Record ... records) throws IOException {
- FileRecords fileRecords = FileRecords.open(tempFile());
- fileRecords.append(MemoryRecords.withRecords(records));
+ private void append(FileRecords fileRecords, byte[][] values) throws IOException {
+ long offset = 0L;
+ for (byte[] value : values) {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
+ CompressionType.NONE, TimestampType.CREATE_TIME, offset);
+ builder.appendWithOffset(offset++, System.currentTimeMillis(), null, value);
+ fileRecords.append(builder.build());
+ }
fileRecords.flush();
- return fileRecords;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java
new file mode 100644
index 0000000..9480c60
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/LegacyRecordTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(value = Parameterized.class)
+public class LegacyRecordTest {
+
+ private final byte magic;
+ private final long timestamp;
+ private final ByteBuffer key;
+ private final ByteBuffer value;
+ private final CompressionType compression;
+ private final TimestampType timestampType;
+ private final LegacyRecord record;
+
+ public LegacyRecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) {
+ this.magic = magic;
+ this.timestamp = timestamp;
+ this.timestampType = TimestampType.CREATE_TIME;
+ this.key = key == null ? null : ByteBuffer.wrap(key);
+ this.value = value == null ? null : ByteBuffer.wrap(value);
+ this.compression = compression;
+ this.record = LegacyRecord.create(magic, timestamp, key, value, compression, timestampType);
+ }
+
+ @Test
+ public void testFields() {
+ assertEquals(compression, record.compressionType());
+ assertEquals(key != null, record.hasKey());
+ assertEquals(key, record.key());
+ if (key != null)
+ assertEquals(key.limit(), record.keySize());
+ assertEquals(magic, record.magic());
+ assertEquals(value, record.value());
+ if (value != null)
+ assertEquals(value.limit(), record.valueSize());
+ if (magic > 0) {
+ assertEquals(timestamp, record.timestamp());
+ assertEquals(timestampType, record.timestampType());
+ } else {
+ assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
+ assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+ }
+ }
+
+ @Test
+ public void testChecksum() {
+ assertEquals(record.checksum(), record.computeChecksum());
+
+ byte attributes = LegacyRecord.computeAttributes(magic, this.compression, TimestampType.CREATE_TIME);
+ assertEquals(record.checksum(), LegacyRecord.computeChecksum(
+ magic,
+ attributes,
+ this.timestamp,
+ this.key == null ? null : this.key.array(),
+ this.value == null ? null : this.value.array()
+ ));
+ assertTrue(record.isValid());
+ for (int i = LegacyRecord.CRC_OFFSET + LegacyRecord.CRC_LENGTH; i < record.sizeInBytes(); i++) {
+ LegacyRecord copy = copyOf(record);
+ copy.buffer().put(i, (byte) 69);
+ assertFalse(copy.isValid());
+ try {
+ copy.ensureValid();
+ fail("Should fail the above test.");
+ } catch (InvalidRecordException e) {
+ // this is good
+ }
+ }
+ }
+
+ private LegacyRecord copyOf(LegacyRecord record) {
+ ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes());
+ record.buffer().put(buffer);
+ buffer.rewind();
+ record.buffer().rewind();
+ return new LegacyRecord(buffer);
+ }
+
+ @Test
+ public void testEquality() {
+ assertEquals(record, copyOf(record));
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ byte[] payload = new byte[1000];
+ Arrays.fill(payload, (byte) 1);
+ List<Object[]> values = new ArrayList<>();
+ for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1))
+ for (long timestamp : Arrays.asList(RecordBatch.NO_TIMESTAMP, 0L, 1L))
+ for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
+ for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
+ for (CompressionType compression : CompressionType.values())
+ values.add(new Object[] {magic, timestamp, key, value, compression});
+ return values;
+ }
+
+}
|