kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives
Date Tue, 13 Dec 2016 18:41:29 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index d18719a..fcf9c89 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,15 +22,16 @@ import java.util.Properties
 
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
 import kafka.api.ApiVersion
-import kafka.common.LongRef
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
-import kafka.message._
 import kafka.utils._
 import kafka.server.KafkaConfig
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.JavaConverters._
+
 class LogTest extends JUnitSuite {
 
   val tmpDir = TestUtils.tempDir()
@@ -63,7 +64,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTimeBasedLogRoll() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
 
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long)
@@ -91,7 +92,7 @@ class LogTest extends JUnitSuite {
 
     // Append a message with timestamp to a segment whose first messgae do not have a timestamp.
     val setWithTimestamp =
-      TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1)
+      TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1)
     log.append(setWithTimestamp)
     assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments)
 
@@ -105,14 +106,14 @@ class LogTest extends JUnitSuite {
     log.append(setWithTimestamp)
     assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments)
 
-    val setWithExpiredTimestamp = TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds)
+    val setWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     log.append(setWithExpiredTimestamp)
     assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments)
 
     val numSegments = log.numberOfSegments
     time.sleep(log.config.segmentMs + 1)
-    log.append(new ByteBufferMessageSet())
-    assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
+    log.append(MemoryRecords.withLogEntries())
+    assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
   /**
@@ -121,7 +122,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTimeBasedLogRollJitter() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val maxJitter = 20 * 60L
 
     val logProps = new Properties()
@@ -149,7 +150,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testSizeBasedLogRoll() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
@@ -176,7 +177,7 @@ class LogTest extends JUnitSuite {
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
     val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time)
-    log.append(TestUtils.singleMessageSet("test".getBytes))
+    log.append(TestUtils.singletonRecords("test".getBytes))
   }
 
   /**
@@ -189,16 +190,17 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
-    val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
+    val records = (0 until 100 by 2).map(id => Record.create(id.toString.getBytes)).toArray
+
+    for(i <- records.indices)
+      log.append(MemoryRecords.withRecords(records(i)))
 
-    for(i <- 0 until messages.length)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
-    for(i <- 0 until messages.length) {
-      val read = log.read(i, 100, Some(i+1)).messageSet.head
+    for(i <- records.indices) {
+      val read = log.read(i, 100, Some(i+1)).records.shallowIterator.next()
       assertEquals("Offset read should match order appended.", i, read.offset)
-      assertEquals("Message should match appended.", messages(i), read.message)
+      assertEquals("Message should match appended.", records(i), read.record)
     }
-    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size)
+    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(records.length, 100, None).records.shallowIterator.asScala.size)
   }
 
   /**
@@ -211,16 +213,16 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val messages = messageIds.map(id => new Message(id.toString.getBytes))
+    val records = messageIds.map(id => Record.create(id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
-    for(i <- 0 until messages.length)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)), assignOffsets = false)
+    for(i <- records.indices)
+      log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
     for(i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
-      val read = log.read(i, 100, None).messageSet.head
+      val read = log.read(i, 100, None).records.shallowIterator.next()
       assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
-      assertEquals("Message should match appended.", messages(idx), read.message)
+      assertEquals("Message should match appended.", records(idx), read.record)
     }
   }
 
@@ -238,12 +240,12 @@ class LogTest extends JUnitSuite {
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+      log.append(MemoryRecords.withRecords(Record.create("42".getBytes)))
 
     // now manually truncate off all but one message from the first segment to create a gap in the messages
     log.logSegments.head.truncateTo(1)
 
-    assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).messageSet.head.offset)
+    assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, log.read(1, 200, None).records.shallowIterator.next().offset)
   }
 
   @Test
@@ -252,12 +254,11 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val messages = messageIds.map(id => new Message(id.toString.getBytes))
+    val records = messageIds.map(id => Record.create(id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
-    for (i <- 0 until messages.length)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)),
-        assignOffsets = false)
+    for (i <- records.indices)
+      log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
 
     for (i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
@@ -265,13 +266,13 @@ class LogTest extends JUnitSuite {
         log.read(i, 1, minOneMessage = true),
         log.read(i, 100, minOneMessage = true),
         log.read(i, 100, Some(10000), minOneMessage = true)
-      ).map(_.messageSet.head)
+      ).map(_.records.shallowIterator.next())
       reads.foreach { read =>
         assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
-        assertEquals("Message should match appended.", messages(idx), read.message)
+        assertEquals("Message should match appended.", records(idx), read.record)
       }
 
-      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).messageSet.toIndexedSeq)
+      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = true).records.shallowIterator.asScala.toIndexedSeq)
     }
 
   }
@@ -282,15 +283,14 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val messages = messageIds.map(id => new Message(id.toString.getBytes))
+    val records = messageIds.map(id => Record.create(id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
-    for (i <- 0 until messages.length)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)),
-        assignOffsets = false)
+    for (i <- records.indices)
+      log.append(MemoryRecords.withLogEntries(LogEntry.create(messageIds(i), records(i))), assignOffsets = false)
 
     for (i <- 50 until messageIds.max) {
-      assertEquals(MessageSet.Empty, log.read(i, 0).messageSet)
+      assertEquals(MemoryRecords.EMPTY, log.read(i, 0).records)
 
       // we return an incomplete message instead of an empty one for the case below
       // we use this mechanism to tell consumers of the fetch request version 2 and below that the message size is
@@ -298,9 +298,9 @@ class LogTest extends JUnitSuite {
       // in fetch request version 3, we no longer need this as we return oversized messages from the first non-empty
       // partition
       val fetchInfo = log.read(i, 1)
-      assertTrue(fetchInfo.firstMessageSetIncomplete)
-      assertTrue(fetchInfo.messageSet.isInstanceOf[FileMessageSet])
-      assertEquals(1, fetchInfo.messageSet.sizeInBytes)
+      assertTrue(fetchInfo.firstEntryIncomplete)
+      assertTrue(fetchInfo.records.isInstanceOf[FileRecords])
+      assertEquals(1, fetchInfo.records.sizeInBytes)
     }
   }
 
@@ -318,9 +318,9 @@ class LogTest extends JUnitSuite {
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
-    log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+    log.append(MemoryRecords.withRecords(Record.create("42".getBytes)))
 
-    assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
+    assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
 
     try {
       log.read(0, 1000)
@@ -336,7 +336,7 @@ class LogTest extends JUnitSuite {
       case _: OffsetOutOfRangeException => // This is good.
     }
 
-    assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes)
+    assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).records.sizeInBytes)
   }
 
   /**
@@ -350,21 +350,22 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
     val numMessages = 100
-    val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
+    val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(i.toString.getBytes))
     messageSets.foreach(log.append(_))
     log.flush
 
     /* do successive reads to ensure all our messages are there */
     var offset = 0L
     for(i <- 0 until numMessages) {
-      val messages = log.read(offset, 1024*1024).messageSet
-      assertEquals("Offsets not equal", offset, messages.head.offset)
-      assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message,
-        messages.head.message.toFormatVersion(messageSets(i).head.message.magic))
-      offset = messages.head.offset + 1
+      val messages = log.read(offset, 1024*1024).records.shallowIterator
+      val head = messages.next()
+      assertEquals("Offsets not equal", offset, head.offset)
+      assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowIterator.next().record,
+        head.record.convert(messageSets(i).shallowIterator.next().record.magic))
+      offset = head.offset + 1
     }
-    val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
-    assertEquals("Should be no more messages", 0, lastRead.size)
+    val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).records
+    assertEquals("Should be no more messages", 0, lastRead.shallowIterator.asScala.size)
 
     // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
     TestUtils.retry(1000L){
@@ -383,10 +384,10 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
-    log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
-    log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
+    log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("hello".getBytes), Record.create("there".getBytes)))
+    log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("alpha".getBytes), Record.create("beta".getBytes)))
 
-    def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head)
+    def read(offset: Int) = log.read(offset, 4096).records.deepIterator
 
     /* we should always get the first message in the compressed set when reading any offset in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
@@ -408,7 +409,7 @@ class LogTest extends JUnitSuite {
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
       val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
-        log.append(TestUtils.singleMessageSet(payload = i.toString.getBytes, timestamp = time.milliseconds - 10))
+        log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10))
 
       val currOffset = log.logEndOffset
       assertEquals(currOffset, messagesToAppend)
@@ -422,7 +423,7 @@ class LogTest extends JUnitSuite {
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
                    currOffset,
-                   log.append(TestUtils.singleMessageSet("hello".getBytes)).firstOffset)
+                   log.append(TestUtils.singletonRecords("hello".getBytes)).firstOffset)
 
       // cleanup the log
       log.delete()
@@ -435,7 +436,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSetSizeCheck() {
-    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
+    val messageSet = MemoryRecords.withRecords(Record.create("You".getBytes), Record.create("bethe".getBytes))
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
     val logProps = new Properties()
@@ -454,17 +455,17 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testCompactedTopicConstraints() {
-    val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
-    val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
-    val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes)
+    val keyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, Record.NO_TIMESTAMP, "and here it is".getBytes, "this message has a key".getBytes)
+    val anotherKeyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, Record.NO_TIMESTAMP, "another key".getBytes, "this message also has a key".getBytes)
+    val unkeyedMessage = Record.create("this message does not have a key".getBytes)
 
-    val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
-    val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage)
-    val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage)
-    val messageSetWithCompressedUnkeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage, unkeyedMessage)
+    val messageSetWithUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage, keyedMessage)
+    val messageSetWithOneUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage)
+    val messageSetWithCompressedKeyedMessage = MemoryRecords.withRecords(CompressionType.GZIP, keyedMessage)
+    val messageSetWithCompressedUnkeyedMessage = MemoryRecords.withRecords(CompressionType.GZIP, keyedMessage, unkeyedMessage)
 
-    val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
-    val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage)
+    val messageSetWithKeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage)
+    val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage)
 
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
@@ -502,8 +503,8 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSizeCheck() {
-    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
-    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change (I need more bytes)".getBytes))
+    val first = MemoryRecords.withRecords(CompressionType.NONE, Record.create("You".getBytes), Record.create("bethe".getBytes))
+    val second = MemoryRecords.withRecords(CompressionType.NONE, Record.create("change (I need more bytes)".getBytes))
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
@@ -537,7 +538,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(messageSize),
+      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = time.milliseconds + i * 10))
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
     val lastIndexOffset = log.activeSegment.index.lastOffset
@@ -585,7 +586,7 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
 
     val messages = (0 until numMessages).map { i =>
-      new ByteBufferMessageSet(NoCompressionCodec, new LongRef(100 + i), new Message(i.toString.getBytes(), time.milliseconds + i, Message.MagicValue_V1))
+      MemoryRecords.withLogEntries(LogEntry.create(100 + i, Record.create(Record.MAGIC_VALUE_V1, time.milliseconds + i, i.toString.getBytes())))
     }
     messages.foreach(log.append(_, assignOffsets = false))
     val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
@@ -608,7 +609,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
@@ -623,7 +624,7 @@ class LogTest extends JUnitSuite {
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
+      assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -647,7 +648,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
@@ -676,7 +677,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+      log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
@@ -699,7 +700,7 @@ class LogTest extends JUnitSuite {
     log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
-      assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
+      assertEquals(i, log.read(i, 100, None).records.shallowIterator.next().offset)
       if (i == 0)
         assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
@@ -713,7 +714,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTruncateTo() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
@@ -770,7 +771,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testIndexResizingAtTruncation() {
-    val setSize = TestUtils.singleMessageSet(payload = "test".getBytes).sizeInBytes
+    val setSize = TestUtils.singletonRecords(value = "test".getBytes).sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
     val logProps = new Properties()
@@ -781,12 +782,12 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
-      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
+      log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
+      log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
     val expectedEntries = msgPerSeg - 1
 
@@ -800,7 +801,7 @@ class LogTest extends JUnitSuite {
 
     time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
+      log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
   }
 
@@ -814,7 +815,7 @@ class LogTest extends JUnitSuite {
     val bogusIndex2 = Log.indexFilename(logDir, 5)
     val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5)
 
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -842,7 +843,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReopenThenTruncate() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -875,7 +876,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAsyncDelete() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val asyncDeleteMs = 1000
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
@@ -921,7 +922,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testOpenDeletesObsoleteFiles() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -957,10 +958,10 @@ class LogTest extends JUnitSuite {
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
-    log.append(new ByteBufferMessageSet(new Message(bytes = null)))
-    val messageSet = log.read(0, 4096, None).messageSet
-    assertEquals(0, messageSet.head.offset)
-    assertTrue("Message payload should be null.", messageSet.head.message.isNull)
+    log.append(MemoryRecords.withRecords(Record.create(null)))
+    val head = log.read(0, 4096, None).records.shallowIterator().next()
+    assertEquals(0, head.offset)
+    assertTrue("Message payload should be null.", head.record.hasNullValue)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
@@ -970,9 +971,9 @@ class LogTest extends JUnitSuite {
       recoveryPoint = 0L,
       time.scheduler,
       time)
-    val messages = (0 until 2).map(id => new Message(id.toString.getBytes)).toArray
-    messages.foreach(message => log.append(new ByteBufferMessageSet(message)))
-    val invalidMessage = new ByteBufferMessageSet(new Message(1.toString.getBytes))
+    val messages = (0 until 2).map(id => Record.create(id.toString.getBytes)).toArray
+    messages.foreach(record => log.append(MemoryRecords.withRecords(record)))
+    val invalidMessage = MemoryRecords.withRecords(Record.create(1.toString.getBytes))
     log.append(invalidMessage, assignOffsets = false)
   }
 
@@ -984,7 +985,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val recoveryPoint = 50L
     for (_ <- 0 until 50) {
       // create a log and write some messages to it
@@ -997,7 +998,7 @@ class LogTest extends JUnitSuite {
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.append(set)
-      val messages = log.logSegments.flatMap(_.log.iterator.toList)
+      val messages = log.logSegments.flatMap(_.log.deepIterator.asScala.toList)
       log.close()
 
       // corrupt index and log by appending random bytes
@@ -1007,7 +1008,8 @@ class LogTest extends JUnitSuite {
       // attempt recovery
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)
-      assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
+      assertEquals("Messages in the log after recovery should be the same.", messages,
+        log.logSegments.flatMap(_.log.deepIterator.asScala.toList))
       Utils.delete(logDir)
     }
   }
@@ -1020,7 +1022,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val parentLogDir = logDir.getParentFile
     assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
     val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)
@@ -1121,7 +1123,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testDeleteOldSegmentsMethod() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -1154,7 +1156,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldDeleteSizeBasedSegments() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
 
     // append some messages to create some segments
@@ -1167,7 +1169,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
+    val set = TestUtils.singletonRecords("test".getBytes)
     val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
 
     // append some messages to create some segments
@@ -1180,7 +1182,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() {
-    val set = TestUtils.singleMessageSet("test".getBytes, timestamp = 10)
+    val set = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
     val log = createLog(set.sizeInBytes, retentionMs = 10000)
 
     // append some messages to create some segments
@@ -1193,7 +1195,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() {
-    val set = TestUtils.singleMessageSet("test".getBytes, timestamp = time.milliseconds)
+    val set = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds)
     val log = createLog(set.sizeInBytes, retentionMs = 10000000)
 
     // append some messages to create some segments
@@ -1206,7 +1208,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() {
-    val set = TestUtils.singleMessageSet("test".getBytes, key = "test".getBytes(), timestamp = 10L)
+    val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
     val log = createLog(set.sizeInBytes,
       retentionMs = 10000,
       cleanupPolicy = "compact")
@@ -1225,7 +1227,7 @@ class LogTest extends JUnitSuite {
 
   @Test
   def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
-    val set = TestUtils.singleMessageSet("test".getBytes, key = "test".getBytes,timestamp = 10L)
+    val set = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L)
     val log = createLog(set.sizeInBytes,
       retentionMs = 10000,
       cleanupPolicy = "compact,delete")

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
new file mode 100644
index 0000000..72c5b16
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.nio.ByteBuffer
+
+import kafka.common.LongRef
+import kafka.message._
+import org.apache.kafka.common.errors.InvalidTimestampException
+import org.apache.kafka.common.record._
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection.JavaConverters._
+
+class LogValidatorTest extends JUnitSuite {
+
+  @Test
+  def testLogAppendTimeNonCompressed() {
+    val now = System.currentTimeMillis()
+    // The timestamps should be overwritten
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec = CompressionType.NONE)
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(0),
+      now = now,
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+    val validatedRecords = validatedResults.validatedRecords
+    assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size)
+    validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+    assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+  }
+
+  @Test
+  def testLogAppendTimeWithRecompression() {
+    val now = System.currentTimeMillis()
+    // The timestamps should be overwritten
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      now = now,
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+    val validatedRecords = validatedResults.validatedRecords
+
+    assertEquals("number of messages should not change", records.deepIterator.asScala.size, validatedRecords.deepIterator.asScala.size)
+    validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+    assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid)
+    assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}",
+      records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged)
+  }
+
+  @Test
+  def testLogAppendTimeWithoutRecompression() {
+    val now = System.currentTimeMillis()
+    // The timestamps should be overwritten
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1,
+      timestamp = 0L, codec = CompressionType.GZIP)
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      now = now,
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+    val validatedRecords = validatedResults.validatedRecords
+
+    assertEquals("number of messages should not change", records.deepIterator.asScala.size,
+      validatedRecords.deepIterator.asScala.size)
+    validatedRecords.deepIterator.asScala.foreach(logEntry => validateLogAppendTime(now, logEntry.record))
+    assertTrue("MessageSet should still valid", validatedRecords.shallowIterator.next().record.isValid)
+    assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${records.deepIterator.asScala.size - 1}",
+      records.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+  }
+
+  @Test
+  def testCreateTimeNonCompressed() {
+    val now = System.currentTimeMillis()
+    val timestampSeq = Seq(now - 1, now + 1, now)
+    val records =
+      MemoryRecords.withRecords(CompressionType.NONE,
+        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
+        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
+        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
+
+    val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(0),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+    val validatedRecords = validatingResults.validatedRecords
+
+    var i = 0
+    for (logEntry <- validatedRecords.deepIterator.asScala) {
+      logEntry.record.ensureValid()
+      assertEquals(logEntry.record.timestamp, timestampSeq(i))
+      assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
+      i += 1
+    }
+    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
+  }
+
+  @Test
+  def testCreateTimeCompressed() {
+    val now = System.currentTimeMillis()
+    val timestampSeq = Seq(now - 1, now + 1, now)
+    val records =
+      MemoryRecords.withRecords(CompressionType.GZIP,
+        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
+        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
+        Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
+
+    val validatedResults =
+      LogValidator.validateMessagesAndAssignOffsets(records,
+        offsetCounter = new LongRef(0),
+        now = System.currentTimeMillis(),
+        sourceCodec = DefaultCompressionCodec,
+        targetCodec = DefaultCompressionCodec,
+        messageFormatVersion = Record.MAGIC_VALUE_V1,
+        messageTimestampType = TimestampType.CREATE_TIME,
+        messageTimestampDiffMaxMs = 1000L)
+    val validatedRecords = validatedResults.validatedRecords
+
+    var i = 0
+    for (logEntry <- validatedRecords.deepIterator.asScala) {
+      logEntry.record.ensureValid()
+      assertEquals(logEntry.record.timestamp, timestampSeq(i))
+      assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
+      i += 1
+    }
+    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepIterator.asScala.size - 1}",
+      validatedRecords.deepIterator.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
+  }
+
+  @Test(expected = classOf[InvalidTimestampException])
+  def testInvalidCreateTimeNonCompressed() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L,
+      codec = CompressionType.NONE)
+    LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+  }
+
+  @Test(expected = classOf[InvalidTimestampException])
+  def testInvalidCreateTimeCompressed() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now - 1001L,
+      codec = CompressionType.GZIP)
+    LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(0),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L)
+  }
+  @Test
+  def testAbsoluteOffsetAssignmentNonCompressed() {
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testAbsoluteOffsetAssignmentCompressed() {
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testRelativeOffsetAssignmentNonCompressed() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.NONE)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    val messageWithOffset = LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords
+    checkOffsets(messageWithOffset, offset)
+  }
+
+  @Test
+  def testRelativeOffsetAssignmentCompressed() {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = now, codec = CompressionType.GZIP)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    val compressedMessagesWithOffset = LogValidator.validateMessagesAndAssignOffsets(
+      records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords
+    checkOffsets(compressedMessagesWithOffset, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterMessageFormatConversionV0NonCompressed() {
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.NONE)
+    checkOffsets(records, 0)
+    val offset = 1234567
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterMessageFormatConversionV0Compressed() {
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val offset = 1234567
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V1,
+      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterMessageFormatConversionV1NonCompressed() {
+    val offset = 1234567
+    val now = System.currentTimeMillis()
+    val records = createRecords(Record.MAGIC_VALUE_V1, now, codec = CompressionType.NONE)
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = NoCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterMessageFormatConversionV1Compressed() {
+    val offset = 1234567
+    val now = System.currentTimeMillis()
+    val records = createRecords(Record.MAGIC_VALUE_V1, now, CompressionType.GZIP)
+    checkOffsets(records, 0)
+    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = DefaultCompressionCodec,
+      targetCodec = DefaultCompressionCodec,
+      messageFormatVersion = Record.MAGIC_VALUE_V0,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+  }
+
+  @Test(expected = classOf[InvalidRecordException])
+  def testInvalidInnerMagicVersion(): Unit = {
+    val offset = 1234567
+    val records = recordsWithInvalidInnerMagic(offset)
+    LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = SnappyCompressionCodec,
+      targetCodec = SnappyCompressionCodec,
+      messageTimestampType = TimestampType.CREATE_TIME,
+      messageTimestampDiffMaxMs = 5000L)
+  }
+
+  private def createRecords(magicValue: Byte = Message.CurrentMagicValue,
+                              timestamp: Long = Message.NoTimestamp,
+                              codec: CompressionType = CompressionType.NONE): MemoryRecords = {
+    if (magicValue == Record.MAGIC_VALUE_V0) {
+      MemoryRecords.withRecords(
+        codec,
+        Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "hello".getBytes),
+        Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "there".getBytes),
+        Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "beautiful".getBytes))
+    } else {
+      MemoryRecords.withRecords(
+        codec,
+        Record.create(Record.MAGIC_VALUE_V1, timestamp, "hello".getBytes),
+        Record.create(Record.MAGIC_VALUE_V1, timestamp, "there".getBytes),
+        Record.create(Record.MAGIC_VALUE_V1, timestamp, "beautiful".getBytes))
+    }
+  }
+
+  /* check that offsets are assigned consecutively from the given base offset */
+  private def checkOffsets(records: MemoryRecords, baseOffset: Long) {
+    assertTrue("Message set should not be empty", records.deepIterator.asScala.nonEmpty)
+    var offset = baseOffset
+    for (entry <- records.deepIterator.asScala) {
+      assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
+      offset += 1
+    }
+  }
+
+  private def recordsWithInvalidInnerMagic(initialOffset: Long): MemoryRecords = {
+    val records = (0 until 20).map(id =>
+      Record.create(Record.MAGIC_VALUE_V0,
+        Record.NO_TIMESTAMP,
+        id.toString.getBytes,
+        id.toString.getBytes))
+
+    val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
+    val builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.GZIP,
+      TimestampType.CREATE_TIME)
+
+    var offset = initialOffset
+    records.foreach { record =>
+      builder.appendUnchecked(offset, record)
+      offset += 1
+    }
+
+    builder.build()
+  }
+
+  def validateLogAppendTime(now: Long, record: Record) {
+    record.ensureValid()
+    assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp)
+    assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index 476a577..bd3ed68 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -18,13 +18,11 @@
 package kafka.message
 
 import java.nio.ByteBuffer
-import java.nio.channels.{FileChannel, GatheringByteChannel}
-import java.nio.file.StandardOpenOption
+import java.nio.channels.GatheringByteChannel
 
 import org.junit.Assert._
 import kafka.utils.TestUtils._
-import kafka.log.FileMessageSet
-import kafka.utils.TestUtils
+import org.apache.kafka.common.record.FileRecords
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 
@@ -94,7 +92,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   @Test
   def testWriteToChannelThatConsumesPartially() {
     val bytesToConsumePerBuffer = 50
-    val messages = (0 until 10).map(_ => new Message(TestUtils.randomString(100).getBytes))
+    val messages = (0 until 10).map(_ => new Message(randomString(100).getBytes))
     val messageSet = createMessageSet(messages)
     val messageSetSize = messageSet.sizeInBytes
 
@@ -119,15 +117,15 @@ trait BaseMessageSetTestCases extends JUnitSuite {
     // do the write twice to ensure the message set is restored to its original state
     for (_ <- 0 to 1) {
       val file = tempFile()
-      val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE)
+      val fileRecords = FileRecords.open(file, true)
       try {
-        val written = write(channel)
+        val written = write(fileRecords.channel)
+        fileRecords.resize() // resize since we wrote to the channel directly
+
         assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
-        val newSet = new FileMessageSet(file, channel)
-        checkEquals(set.iterator, newSet.iterator)
-      } finally channel.close()
+        checkEquals(set.asRecords.deepIterator, fileRecords.deepIterator())
+      } finally fileRecords.close()
     }
   }
   
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index e2cfb87..5e22433 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -17,13 +17,9 @@
 
 package kafka.message
 
-import java.io.DataOutputStream
 import java.nio._
 
-import kafka.common.LongRef
 import kafka.utils.TestUtils
-import org.apache.kafka.common.errors.InvalidTimestampException
-import org.apache.kafka.common.record.TimestampType
 import org.junit.Assert._
 import org.junit.Test
 
@@ -151,295 +147,6 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     assertEquals("second offset should be 2", 2L, iter.next().offset)
   }
 
-  @Test
-  def testLogAppendTime() {
-    val now = System.currentTimeMillis()
-    // The timestamps should be overwritten
-    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = NoCompressionCodec)
-    val compressedMessagesWithRecompresion = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
-    val compressedMessagesWithoutRecompression =
-      getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = DefaultCompressionCodec)
-
-    val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                      now = now,
-                                                                      sourceCodec = NoCompressionCodec,
-                                                                      targetCodec = NoCompressionCodec,
-                                                                      messageFormatVersion = 1,
-                                                                      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                      messageTimestampDiffMaxMs = 1000L)
-    val validatedMessages = validatingResults.validatedMessages
-
-    val validatingCompressedMessagesResults =
-      compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                          now = now,
-                                                                          sourceCodec = DefaultCompressionCodec,
-                                                                          targetCodec = DefaultCompressionCodec,
-                                                                          messageFormatVersion = 1,
-                                                                          messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                          messageTimestampDiffMaxMs = 1000L)
-    val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages
-
-    val validatingCompressedMessagesWithoutRecompressionResults =
-      compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                              now = now,
-                                                                              sourceCodec = DefaultCompressionCodec,
-                                                                              targetCodec = DefaultCompressionCodec,
-                                                                              messageFormatVersion = 1,
-                                                                              messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                              messageTimestampDiffMaxMs = 1000L)
-
-    val validatedCompressedMessagesWithoutRecompression = validatingCompressedMessagesWithoutRecompressionResults.validatedMessages
-
-    assertEquals("message set size should not change", messages.size, validatedMessages.size)
-    validatedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
-    assertEquals(s"Max timestamp should be $now", now, validatingResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be 0", 0, validatingResults.offsetOfMaxTimestamp)
-    assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
-
-    assertEquals("message set size should not change", compressedMessagesWithRecompresion.size, validatedCompressedMessages.size)
-    validatedCompressedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
-    assertTrue("MessageSet should still valid", validatedCompressedMessages.shallowIterator.next().message.isValid)
-    assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithRecompresion.size - 1}",
-      compressedMessagesWithRecompresion.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp)
-    assertTrue("Message size may have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged)
-
-    assertEquals("message set size should not change", compressedMessagesWithoutRecompression.size,
-      validatedCompressedMessagesWithoutRecompression.size)
-    validatedCompressedMessagesWithoutRecompression.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
-    assertTrue("MessageSet should still valid", validatedCompressedMessagesWithoutRecompression.shallowIterator.next().message.isValid)
-    assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesWithoutRecompressionResults.maxTimestamp)
-    assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithoutRecompression.size - 1}",
-      compressedMessagesWithoutRecompression.size - 1, validatingCompressedMessagesWithoutRecompressionResults.offsetOfMaxTimestamp)
-    assertFalse("Message size should not have been changed", validatingCompressedMessagesWithoutRecompressionResults.messageSizeMaybeChanged)
-
-    def validateLogAppendTime(message: Message) {
-      message.ensureValid()
-      assertEquals(s"Timestamp of message $message should be $now", now, message.timestamp)
-      assertEquals(TimestampType.LOG_APPEND_TIME, message.timestampType)
-    }
-  }
-
-  @Test
-  def testCreateTime() {
-    val now = System.currentTimeMillis()
-    val timestampSeq = Seq(now - 1, now + 1, now)
-    val messages =
-      new ByteBufferMessageSet(NoCompressionCodec,
-                               new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1),
-                               new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1),
-                               new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1))
-    val compressedMessages =
-      new ByteBufferMessageSet(DefaultCompressionCodec,
-                               new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1),
-                               new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1),
-                               new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1))
-
-    val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                      now = System.currentTimeMillis(),
-                                                                      sourceCodec = NoCompressionCodec,
-                                                                      targetCodec = NoCompressionCodec,
-                                                                      messageFormatVersion = 1,
-                                                                      messageTimestampType = TimestampType.CREATE_TIME,
-                                                                      messageTimestampDiffMaxMs = 1000L)
-    val validatedMessages = validatingResults.validatedMessages
-
-    val validatingCompressedMessagesResults =
-      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                          now = System.currentTimeMillis(),
-                                                          sourceCodec = DefaultCompressionCodec,
-                                                          targetCodec = DefaultCompressionCodec,
-                                                          messageFormatVersion = 1,
-                                                          messageTimestampType = TimestampType.CREATE_TIME,
-                                                          messageTimestampDiffMaxMs = 1000L)
-    val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages
-
-    var i = 0
-    for (messageAndOffset <- validatedMessages) {
-      messageAndOffset.message.ensureValid()
-      assertEquals(messageAndOffset.message.timestamp, timestampSeq(i))
-      assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
-      i += 1
-    }
-    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
-    assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.offsetOfMaxTimestamp)
-    assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
-    i = 0
-    for (messageAndOffset <- validatedCompressedMessages) {
-      messageAndOffset.message.ensureValid()
-      assertEquals(messageAndOffset.message.timestamp, timestampSeq(i))
-      assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
-      i += 1
-    }
-    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
-    assertEquals(s"Offset of max timestamp should be ${validatedCompressedMessages.size - 1}",
-      validatedCompressedMessages.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp)
-    assertFalse("Message size should not have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged)
-  }
-
-  @Test
-  def testInvalidCreateTime() {
-    val now = System.currentTimeMillis()
-    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now - 1001L, codec = NoCompressionCodec)
-    val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now - 1001L, codec = DefaultCompressionCodec)
-
-    try {
-      messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                now = System.currentTimeMillis(),
-                                                sourceCodec = NoCompressionCodec,
-                                                targetCodec = NoCompressionCodec,
-                                                messageFormatVersion = 1,
-                                                messageTimestampType = TimestampType.CREATE_TIME,
-                                                messageTimestampDiffMaxMs = 1000L)
-      fail("Should throw InvalidMessageException.")
-    } catch {
-      case _: InvalidTimestampException =>
-    }
-
-    try {
-      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                          now = System.currentTimeMillis(),
-                                                          sourceCodec = DefaultCompressionCodec,
-                                                          targetCodec = DefaultCompressionCodec,
-                                                          messageFormatVersion = 1,
-                                                          messageTimestampType = TimestampType.CREATE_TIME,
-                                                          messageTimestampDiffMaxMs = 1000L)
-      fail("Should throw InvalidMessageException.")
-    } catch {
-      case _: InvalidTimestampException =>
-    }
-  }
-
-  @Test
-  def testAbsoluteOffsetAssignment() {
-    val messages = getMessages(magicValue = Message.MagicValue_V0, codec = NoCompressionCodec)
-    val compressedMessages = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
-    // check uncompressed offsets
-    checkOffsets(messages, 0)
-    val offset = 1234567
-    checkOffsets(messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                           now = System.currentTimeMillis(),
-                                                           sourceCodec = NoCompressionCodec,
-                                                           targetCodec = NoCompressionCodec,
-                                                           messageFormatVersion = 0,
-                                                           messageTimestampType = TimestampType.CREATE_TIME,
-                                                           messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
-
-    // check compressed messages
-    checkOffsets(compressedMessages, 0)
-    checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                     now = System.currentTimeMillis(),
-                                                                     sourceCodec = DefaultCompressionCodec,
-                                                                     targetCodec = DefaultCompressionCodec,
-                                                                     messageFormatVersion = 0,
-                                                                     messageTimestampType = TimestampType.CREATE_TIME,
-                                                                     messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
-
-  }
-
-  @Test
-  def testRelativeOffsetAssignment() {
-    val now = System.currentTimeMillis()
-    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec)
-    val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec)
-
-    // check uncompressed offsets
-    checkOffsets(messages, 0)
-    val offset = 1234567
-    val messageWithOffset = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                      now = System.currentTimeMillis(),
-                                                                      sourceCodec = NoCompressionCodec,
-                                                                      targetCodec = NoCompressionCodec,
-                                                                      messageTimestampType = TimestampType.CREATE_TIME,
-                                                                      messageTimestampDiffMaxMs = 5000L).validatedMessages
-    checkOffsets(messageWithOffset, offset)
-
-    // check compressed messages
-    checkOffsets(compressedMessages, 0)
-    val compressedMessagesWithOffset = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                                           now = System.currentTimeMillis(),
-                                                                                           sourceCodec = DefaultCompressionCodec,
-                                                                                           targetCodec = DefaultCompressionCodec,
-                                                                                           messageTimestampType = TimestampType.CREATE_TIME,
-                                                                                           messageTimestampDiffMaxMs = 5000L).validatedMessages
-    checkOffsets(compressedMessagesWithOffset, offset)
-  }
-
-  @Test(expected = classOf[InvalidMessageException])
-  def testInvalidInnerMagicVersion(): Unit = {
-    val offset = 1234567
-    val messages = messageSetWithInvalidInnerMagic(SnappyCompressionCodec, offset)
-    messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-      now = System.currentTimeMillis(),
-      sourceCodec = SnappyCompressionCodec,
-      targetCodec = SnappyCompressionCodec,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedMessages
-  }
-
-
-  @Test
-  def testOffsetAssignmentAfterMessageFormatConversion() {
-    // Check up conversion
-    val messagesV0 = getMessages(magicValue = Message.MagicValue_V0, codec = NoCompressionCodec)
-    val compressedMessagesV0 = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
-    // check uncompressed offsets
-    checkOffsets(messagesV0, 0)
-    val offset = 1234567
-    checkOffsets(messagesV0.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                             now = System.currentTimeMillis(),
-                                                             sourceCodec = NoCompressionCodec,
-                                                             targetCodec = NoCompressionCodec,
-                                                             messageFormatVersion = 1,
-                                                             messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                             messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
-
-    // check compressed messages
-    checkOffsets(compressedMessagesV0, 0)
-    checkOffsets(compressedMessagesV0.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                       now = System.currentTimeMillis(),
-                                                                       sourceCodec = DefaultCompressionCodec,
-                                                                       targetCodec = DefaultCompressionCodec,
-                                                                       messageFormatVersion = 1,
-                                                                       messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                       messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
-
-    // Check down conversion
-    val now = System.currentTimeMillis()
-    val messagesV1 = getMessages(Message.MagicValue_V1, now, NoCompressionCodec)
-    val compressedMessagesV1 = getMessages(Message.MagicValue_V1, now, DefaultCompressionCodec)
-
-    // check uncompressed offsets
-    checkOffsets(messagesV1, 0)
-    checkOffsets(messagesV1.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                             now = System.currentTimeMillis(),
-                                                             sourceCodec = NoCompressionCodec,
-                                                             targetCodec = NoCompressionCodec,
-                                                             messageFormatVersion = 0,
-                                                             messageTimestampType = TimestampType.CREATE_TIME,
-                                                             messageTimestampDiffMaxMs = 5000L).validatedMessages, offset)
-
-    // check compressed messages
-    checkOffsets(compressedMessagesV1, 0)
-    checkOffsets(compressedMessagesV1.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                       now = System.currentTimeMillis(),
-                                                                       sourceCodec = DefaultCompressionCodec,
-                                                                       targetCodec = DefaultCompressionCodec,
-                                                                       messageFormatVersion = 0,
-                                                                       messageTimestampType = TimestampType.CREATE_TIME,
-                                                                       messageTimestampDiffMaxMs = 5000L).validatedMessages, offset)
-  }
-
-  @Test
-  def testWriteFullyTo() {
-    checkWriteFullyToWithMessageSet(createMessageSet(Array[Message]()))
-    checkWriteFullyToWithMessageSet(createMessageSet(messages))
-  }
-
-  def checkWriteFullyToWithMessageSet(messageSet: ByteBufferMessageSet) {
-    checkWriteWithMessageSet(messageSet, messageSet.writeFullyTo)
-  }
-  
   /* check that offsets are assigned based on byte offset from the given base offset */
   def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {
     assertTrue("Message set should not be empty", messages.nonEmpty)
@@ -457,59 +164,4 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     assertTrue(shallowOffsets.subsetOf(deepOffsets))
   }
 
-  private def getMessages(magicValue: Byte = Message.CurrentMagicValue,
-                          timestamp: Long = Message.NoTimestamp,
-                          codec: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet = {
-    if (magicValue == Message.MagicValue_V0) {
-      new ByteBufferMessageSet(
-        codec,
-        new Message("hello".getBytes, Message.NoTimestamp, Message.MagicValue_V0),
-        new Message("there".getBytes, Message.NoTimestamp, Message.MagicValue_V0),
-        new Message("beautiful".getBytes, Message.NoTimestamp, Message.MagicValue_V0))
-    } else {
-      new ByteBufferMessageSet(
-        codec,
-        new Message("hello".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1),
-        new Message("there".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1),
-        new Message("beautiful".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1))
-    }
-  }
-
-  private def messageSetWithInvalidInnerMagic(codec: CompressionCodec,
-                                              initialOffset: Long): ByteBufferMessageSet = {
-    val messages = (0 until 20).map(id =>
-      new Message(key = id.toString.getBytes,
-        bytes = id.toString.getBytes,
-        timestamp = Message.NoTimestamp,
-        magicValue = Message.MagicValue_V0))
-
-    val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
-    var lastOffset = initialOffset
-
-    messageWriter.write(
-      codec = codec,
-      timestamp = System.currentTimeMillis(),
-      timestampType = TimestampType.CREATE_TIME,
-      magicValue = Message.MagicValue_V1) { outputStream =>
-
-      val output = new DataOutputStream(CompressionFactory(codec, Message.MagicValue_V1, outputStream))
-      try {
-        for (message <- messages) {
-          val innerOffset = lastOffset - initialOffset
-          output.writeLong(innerOffset)
-          output.writeInt(message.size)
-          output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
-          lastOffset += 1
-        }
-      } finally {
-        output.close()
-      }
-    }
-    val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
-    ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1)
-    buffer.rewind()
-
-    new ByteBufferMessageSet(buffer)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index e8abfe1..5d2c8fb 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -70,7 +70,7 @@ class MessageCompressionTest extends JUnitSuite {
     testCompressSize(GZIPCompressionCodec, messages, 396)
 
     if(isSnappyAvailable)
-      testCompressSize(SnappyCompressionCodec, messages, 502)
+      testCompressSize(SnappyCompressionCodec, messages, 1063)
 
     if(isLZ4Available)
       testCompressSize(LZ4CompressionCodec, messages, 387)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 5c02125..46c25af 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -48,7 +48,7 @@ class MessageTest extends JUnitSuite {
     val magicValues = Array(Message.MagicValue_V0, Message.MagicValue_V1)
     for(k <- keys; v <- vals; codec <- codecs; t <- timestamps; mv <- magicValues) {
       val timestamp = ensureValid(mv, t)
-      messages += new MessageTestVal(k, v, codec, timestamp, mv, new Message(v, k, timestamp, codec, mv))
+      messages += MessageTestVal(k, v, codec, timestamp, mv, new Message(v, k, timestamp, codec, mv))
     }
 
     def ensureValid(magicValue: Byte, timestamp: Long): Long =
@@ -96,7 +96,7 @@ class MessageTest extends JUnitSuite {
 
   @Test
   def testEquality() {
-    for(v <- messages) {
+    for (v <- messages) {
       assertFalse("Should not equal null", v.message.equals(null))
       assertFalse("Should not equal a random string", v.message.equals("asdf"))
       assertTrue("Should equal itself", v.message.equals(v.message))
@@ -105,40 +105,6 @@ class MessageTest extends JUnitSuite {
     }
   }
 
-  @Test
-  def testMessageFormatConversion() {
-
-    def convertAndVerify(v: MessageTestVal, fromMessageFormat: Byte, toMessageFormat: Byte) {
-      assertEquals("Message should be the same when convert to the same version.",
-        v.message.toFormatVersion(fromMessageFormat), v.message)
-      val convertedMessage = v.message.toFormatVersion(toMessageFormat)
-      assertEquals("Size difference is not expected value", convertedMessage.size - v.message.size,
-        Message.headerSizeDiff(fromMessageFormat, toMessageFormat))
-      assertTrue("Message should still be valid", convertedMessage.isValid)
-      assertEquals("Timestamp should be NoTimestamp", convertedMessage.timestamp, Message.NoTimestamp)
-      assertEquals(s"Magic value should be $toMessageFormat now", convertedMessage.magic, toMessageFormat)
-      if (convertedMessage.hasKey)
-        assertEquals("Message key should not change", convertedMessage.key, ByteBuffer.wrap(v.key))
-      else
-        assertNull(convertedMessage.key)
-      if(v.payload == null) {
-        assertTrue(convertedMessage.isNull)
-        assertEquals("Payload should be null", null, convertedMessage.payload)
-      } else {
-        assertEquals("Message payload should not change", convertedMessage.payload, ByteBuffer.wrap(v.payload))
-      }
-      assertEquals("Compression codec should not change", convertedMessage.compressionCodec, v.codec)
-    }
-
-    for (v <- messages) {
-      if (v.magicValue == Message.MagicValue_V0) {
-        convertAndVerify(v, Message.MagicValue_V0, Message.MagicValue_V1)
-      } else if (v.magicValue == Message.MagicValue_V1) {
-        convertAndVerify(v, Message.MagicValue_V1, Message.MagicValue_V0)
-      }
-    }
-  }
-
   @Test(expected = classOf[IllegalArgumentException])
   def testInvalidTimestampAndMagicValueCombination() {
       new Message("hello".getBytes, 0L, Message.MagicValue_V0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 7d6ad91..5c9f035 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -19,17 +19,17 @@ package kafka.server
 
 import com.yammer.metrics.Metrics
 import kafka.cluster.BrokerEndPoint
-import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
 import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.{mutable, Map}
+import scala.collection.{Map, mutable}
 
 class AbstractFetcherThreadTest {
 
@@ -91,10 +91,10 @@ class AbstractFetcherThreadTest {
     override def offset(topicAndPartition: TopicPartition): Long = offsets(topicAndPartition)
   }
 
-  class TestPartitionData(byteBufferMessageSet: ByteBufferMessageSet) extends PartitionData {
+  class TestPartitionData(records: MemoryRecords = MemoryRecords.EMPTY) extends PartitionData {
     override def errorCode: Short = Errors.NONE.code
 
-    override def toByteBufferMessageSet: ByteBufferMessageSet = byteBufferMessageSet
+    override def toRecords: MemoryRecords = records
 
     override def highWatermark: Long = 0L
 
@@ -119,7 +119,7 @@ class AbstractFetcherThreadTest {
     override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit = {}
 
     override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] =
-      fetchRequest.offsets.mapValues(_ => new TestPartitionData(new ByteBufferMessageSet())).toSeq
+      fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq
 
     override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
       new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap)
@@ -156,8 +156,8 @@ class AbstractFetcherThreadTest {
     @volatile var fetchCount = 0
 
     private val normalPartitionDataSet = List(
-      new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(0L), new Message("hello".getBytes))),
-      new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(1L), new Message("hello".getBytes)))
+      new TestPartitionData(MemoryRecords.withRecords(0L, Record.create("hello".getBytes()))),
+      new TestPartitionData(MemoryRecords.withRecords(1L, Record.create("hello".getBytes())))
     )
 
     override def processPartitionData(topicAndPartition: TopicPartition,
@@ -170,10 +170,10 @@ class AbstractFetcherThreadTest {
             .format(topicAndPartition, fetchOffset, logEndOffset))
 
       // Now check message's crc
-      val messages = partitionData.toByteBufferMessageSet
-      for (messageAndOffset <- messages.shallowIterator) {
-        messageAndOffset.message.ensureValid()
-        logEndOffset = messageAndOffset.nextOffset
+      val records = partitionData.toRecords
+      for (entry <- records.shallowIterator.asScala) {
+        entry.record.ensureValid()
+        logEndOffset = entry.nextOffset
       }
     }
 
@@ -181,12 +181,12 @@ class AbstractFetcherThreadTest {
       fetchCount += 1
       // Set the first fetch to get a corrupted message
       if (fetchCount == 1) {
-        val corruptedMessage = new Message("hello".getBytes)
-        val badChecksum = (corruptedMessage.checksum + 1 % Int.MaxValue).toInt
+        val corruptedRecord = Record.create("hello".getBytes())
+        val badChecksum = (corruptedRecord.checksum + 1 % Int.MaxValue).toInt
         // Garble checksum
-        Utils.writeUnsignedInt(corruptedMessage.buffer, Message.CrcOffset, badChecksum)
-        val byteBufferMessageSet = new ByteBufferMessageSet(NoCompressionCodec, corruptedMessage)
-        fetchRequest.offsets.mapValues(_ => new TestPartitionData(byteBufferMessageSet)).toSeq
+        Utils.writeUnsignedInt(corruptedRecord.buffer, Record.CRC_OFFSET, badChecksum)
+        val records = MemoryRecords.withRecords(corruptedRecord)
+        fetchRequest.offsets.mapValues(_ => new TestPartitionData(records)).toSeq
       } else
       // Then, the following fetches get the normal data
         fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq


Mime
View raw message