kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/5] kafka git commit: KAFKA-3025; Added timetamp to Message and use relative offset.
Date Fri, 19 Feb 2016 15:56:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk eee95228f -> 45c8195fa


http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 511060e..77f5d65 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -19,9 +19,12 @@ package kafka.message
 
 import java.nio._
 import java.util.concurrent.atomic.AtomicLong
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.errors.InvalidTimestampException
+import org.apache.kafka.common.record.TimestampType
 import org.junit.Assert._
 import org.junit.Test
-import kafka.utils.TestUtils
 
 class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
@@ -135,27 +138,240 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
       verifyShallowIterator(mixedMessageSet)
     }
   }
-  
+
+  @Test
+  def testMessageWithProvidedOffsetSeq() {
+    val offsets = Seq(0L, 2L)
+    val messages = new ByteBufferMessageSet(
+      compressionCodec = NoCompressionCodec,
+      offsetSeq = offsets,
+      new Message("hello".getBytes),
+      new Message("goodbye".getBytes))
+    val iter = messages.iterator
+    assertEquals("first offset should be 0", 0L, iter.next().offset)
+    assertEquals("second offset should be 2", 2L, iter.next().offset)
+  }
+
+  @Test
+  def testLogAppendTime() {
+    val startTime = System.currentTimeMillis()
+    // The timestamps should be overwritten
+    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = NoCompressionCodec)
+    val compressedMessagesWithRecompresion = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
+    val compressedMessagesWithoutRecompression =
+      getMessages(magicValue = Message.MagicValue_V1, timestamp = -1L, codec = DefaultCompressionCodec)
+
+    val validatedMessages = messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageFormatVersion = 1,
+                                                                      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                                      messageTimestampDiffMaxMs = 1000L)
+
+    val validatedCompressedMessages =
+      compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+                                                                          sourceCodec = DefaultCompressionCodec,
+                                                                          targetCodec = DefaultCompressionCodec,
+                                                                          messageFormatVersion = 1,
+                                                                          messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                                          messageTimestampDiffMaxMs = 1000L)
+
+    val validatedCompressedMessagesWithoutRecompression =
+      compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+                                                                              sourceCodec = DefaultCompressionCodec,
+                                                                              targetCodec = DefaultCompressionCodec,
+                                                                              messageFormatVersion = 1,
+                                                                              messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                                              messageTimestampDiffMaxMs = 1000L)
+
+    val now = System.currentTimeMillis()
+    assertEquals("message set size should not change", messages.size, validatedMessages.size)
+    validatedMessages.foreach({case messageAndOffset => validateLogAppendTime(messageAndOffset.message)})
+
+    assertEquals("message set size should not change", compressedMessagesWithRecompresion.size, validatedCompressedMessages.size)
+    validatedCompressedMessages.foreach({case messageAndOffset => validateLogAppendTime(messageAndOffset.message)})
+    assertTrue("MessageSet should still valid", validatedCompressedMessages.shallowIterator.next().message.isValid)
+
+    assertEquals("message set size should not change", compressedMessagesWithoutRecompression.size,
+      validatedCompressedMessagesWithoutRecompression.size)
+    validatedCompressedMessagesWithoutRecompression.foreach({case messageAndOffset =>
+      validateLogAppendTime(messageAndOffset.message)})
+    assertTrue("MessageSet should still valid", validatedCompressedMessagesWithoutRecompression.shallowIterator.next().message.isValid)
+
+    def validateLogAppendTime(message: Message) {
+      message.ensureValid()
+      assertTrue(s"Timestamp of message $message should be between $startTime and $now",
+        message.timestamp >= startTime && message.timestamp <= now)
+      assertEquals(TimestampType.LOG_APPEND_TIME, message.timestampType)
+    }
+  }
+
+  @Test
+  def testCreateTime() {
+    val now = System.currentTimeMillis()
+    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec)
+    val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec)
+
+    val validatedMessages = messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageFormatVersion = 1,
+                                                                      messageTimestampType = TimestampType.CREATE_TIME,
+                                                                      messageTimestampDiffMaxMs = 1000L)
+
+    val validatedCompressedMessages =
+      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+                                                          sourceCodec = DefaultCompressionCodec,
+                                                          targetCodec = DefaultCompressionCodec,
+                                                          messageFormatVersion = 1,
+                                                          messageTimestampType = TimestampType.CREATE_TIME,
+                                                          messageTimestampDiffMaxMs = 1000L)
+
+    for (messageAndOffset <- validatedMessages) {
+      messageAndOffset.message.ensureValid()
+      assertEquals(messageAndOffset.message.timestamp, now)
+      assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
+    }
+    for (messageAndOffset <- validatedCompressedMessages) {
+      messageAndOffset.message.ensureValid()
+      assertEquals(messageAndOffset.message.timestamp, now)
+      assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
+    }
+  }
+
   @Test
-  def testOffsetAssignment() {
-    val messages = new ByteBufferMessageSet(NoCompressionCodec,
-                                            new Message("hello".getBytes), 
-                                            new Message("there".getBytes), 
-                                            new Message("beautiful".getBytes))
-    val compressedMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                                      messages = messages.map(_.message).toBuffer:_*)
-    // check uncompressed offsets 
+  def testInvalidCreateTime() {
+    val now = System.currentTimeMillis()
+    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now - 1001L, codec = NoCompressionCodec)
+    val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now - 1001L, codec = DefaultCompressionCodec)
+
+    try {
+      messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+                                                sourceCodec = NoCompressionCodec,
+                                                targetCodec = NoCompressionCodec,
+                                                messageFormatVersion = 1,
+                                                messageTimestampType = TimestampType.CREATE_TIME,
+                                                messageTimestampDiffMaxMs = 1000L)
+      fail("Should throw InvalidMessageException.")
+    } catch {
+      case e: InvalidTimestampException =>
+    }
+
+    try {
+      compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(0),
+                                                          sourceCodec = DefaultCompressionCodec,
+                                                          targetCodec = DefaultCompressionCodec,
+                                                          messageFormatVersion = 1,
+                                                          messageTimestampType = TimestampType.CREATE_TIME,
+                                                          messageTimestampDiffMaxMs = 1000L)
+      fail("Should throw InvalidMessageException.")
+    } catch {
+      case e: InvalidTimestampException =>
+    }
+  }
+
+  @Test
+  def testAbsoluteOffsetAssignment() {
+    val messages = getMessages(magicValue = Message.MagicValue_V0, codec = NoCompressionCodec)
+    val compressedMessages = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
+    // check uncompressed offsets
     checkOffsets(messages, 0)
-    var offset = 1234567
-    checkOffsets(messages.validateMessagesAndAssignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
+    val offset = 1234567
+    checkOffsets(messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                           sourceCodec = NoCompressionCodec,
+                                                           targetCodec = NoCompressionCodec,
+                                                           messageFormatVersion = 0,
+                                                           messageTimestampType = TimestampType.CREATE_TIME,
+                                                           messageTimestampDiffMaxMs = 1000L), offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
+    checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                                     sourceCodec = DefaultCompressionCodec,
+                                                                     targetCodec = DefaultCompressionCodec,
+                                                                     messageFormatVersion = 0,
+                                                                     messageTimestampType = TimestampType.CREATE_TIME,
+                                                                     messageTimestampDiffMaxMs = 1000L), offset)
+
+  }
+
+  @Test
+  def testRelativeOffsetAssignment() {
+    val now = System.currentTimeMillis()
+    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec)
+    val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec)
+
+    // check uncompressed offsets
+    checkOffsets(messages, 0)
+    val offset = 1234567
+    val messageWithOffset = messages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageTimestampType = TimestampType.CREATE_TIME,
+                                                                      messageTimestampDiffMaxMs = 5000L)
+    checkOffsets(messageWithOffset, offset)
+
+    // check compressed messages
+    checkOffsets(compressedMessages, 0)
+    val compressedMessagesWithOffset = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                                                           sourceCodec = DefaultCompressionCodec,
+                                                                                           targetCodec = DefaultCompressionCodec,
+                                                                                           messageTimestampType = TimestampType.CREATE_TIME,
+                                                                                           messageTimestampDiffMaxMs = 5000L)
+    checkOffsets(compressedMessagesWithOffset, offset)
+  }
+
+  @Test
+  def testOffsetAssignmentAfterMessageFormatConversion() {
+    // Check up conversion
+    val messagesV0 = getMessages(magicValue = Message.MagicValue_V0, codec = NoCompressionCodec)
+    val compressedMessagesV0 = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
+    // check uncompressed offsets
+    checkOffsets(messagesV0, 0)
+    val offset = 1234567
+    checkOffsets(messagesV0.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                             sourceCodec = NoCompressionCodec,
+                                                             targetCodec = NoCompressionCodec,
+                                                             messageFormatVersion = 1,
+                                                             messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                             messageTimestampDiffMaxMs = 1000L), offset)
+
+    // check compressed messages
+    checkOffsets(compressedMessagesV0, 0)
+    checkOffsets(compressedMessagesV0.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                                       sourceCodec = DefaultCompressionCodec,
+                                                                       targetCodec = DefaultCompressionCodec,
+                                                                       messageFormatVersion = 1,
+                                                                       messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                                       messageTimestampDiffMaxMs = 1000L), offset)
+
+    // Check down conversion
+    val now = System.currentTimeMillis()
+    val messagesV1 = getMessages(Message.MagicValue_V1, now, NoCompressionCodec)
+    val compressedMessagesV1 = getMessages(Message.MagicValue_V1, now, DefaultCompressionCodec)
+
+    // check uncompressed offsets
+    checkOffsets(messagesV1, 0)
+    checkOffsets(messagesV1.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                             sourceCodec = NoCompressionCodec,
+                                                             targetCodec = NoCompressionCodec,
+                                                             messageFormatVersion = 0,
+                                                             messageTimestampType = TimestampType.CREATE_TIME,
+                                                             messageTimestampDiffMaxMs = 5000L), offset)
+
+    // check compressed messages
+    checkOffsets(compressedMessagesV1, 0)
+    checkOffsets(compressedMessagesV1.validateMessagesAndAssignOffsets(offsetCounter = new AtomicLong(offset),
+                                                                       sourceCodec = DefaultCompressionCodec,
+                                                                       targetCodec = DefaultCompressionCodec,
+                                                                       messageFormatVersion = 0,
+                                                                       messageTimestampType = TimestampType.CREATE_TIME,
+                                                                       messageTimestampDiffMaxMs = 5000L), offset)
   }
   
   /* check that offsets are assigned based on byte offset from the given base offset */
   def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {
+    assertTrue("Message set should not be empty", messages.size > 0)
     var offset = baseOffset
     for(entry <- messages) {
       assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
@@ -169,4 +385,22 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
     assertTrue(shallowOffsets.subsetOf(deepOffsets))
   }
+
+  private def getMessages(magicValue: Byte = Message.CurrentMagicValue,
+                          timestamp: Long = Message.NoTimestamp,
+                          codec: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet = {
+    if (magicValue == Message.MagicValue_V0) {
+      new ByteBufferMessageSet(
+        codec,
+        new Message("hello".getBytes, Message.NoTimestamp, Message.MagicValue_V0),
+        new Message("there".getBytes, Message.NoTimestamp, Message.MagicValue_V0),
+        new Message("beautiful".getBytes, Message.NoTimestamp, Message.MagicValue_V0))
+    } else {
+      new ByteBufferMessageSet(
+        codec,
+        new Message("hello".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1),
+        new Message("there".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1),
+        new Message("beautiful".getBytes, timestamp = timestamp, magicValue = Message.MagicValue_V1))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index d8613f7..53b85ef 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -42,15 +42,17 @@ class MessageCompressionTest extends JUnitSuite {
     val bytes1k: Array[Byte] = (0 until 1000).map(_.toByte).toArray
     val bytes2k: Array[Byte] = (1000 until 2000).map(_.toByte).toArray
     val bytes3k: Array[Byte] = (3000 until 4000).map(_.toByte).toArray
-    val messages: List[Message] = List(new Message(bytes1k), new Message(bytes2k), new Message(bytes3k))
+    val messages: List[Message] = List(new Message(bytes1k, Message.NoTimestamp, Message.MagicValue_V1),
+                                       new Message(bytes2k, Message.NoTimestamp, Message.MagicValue_V1),
+                                       new Message(bytes3k, Message.NoTimestamp, Message.MagicValue_V1))
 
-    testCompressSize(GZIPCompressionCodec, messages, 388)
+    testCompressSize(GZIPCompressionCodec, messages, 396)
 
     if(isSnappyAvailable)
-      testCompressSize(SnappyCompressionCodec, messages, 491)
+      testCompressSize(SnappyCompressionCodec, messages, 502)
 
     if(isLZ4Available)
-      testCompressSize(LZ4CompressionCodec, messages, 380)
+      testCompressSize(LZ4CompressionCodec, messages, 387)
   }
 
   def testSimpleCompressDecompress(compressionCodec: CompressionCodec) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 1755633..3c8a41f 100755
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -26,12 +26,13 @@ import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}
 import kafka.utils.TestUtils
-import kafka.utils.CoreUtils
 import org.apache.kafka.common.utils.Utils
 
-case class MessageTestVal(val key: Array[Byte],
+case class MessageTestVal(val key: Array[Byte], 
                           val payload: Array[Byte],
                           val codec: CompressionCodec,
+                          val timestamp: Long,
+                          val magicValue: Byte,
                           val message: Message)
 
 class MessageTest extends JUnitSuite {
@@ -43,24 +44,41 @@ class MessageTest extends JUnitSuite {
     val keys = Array(null, "key".getBytes, "".getBytes)
     val vals = Array("value".getBytes, "".getBytes, null)
     val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec)
-    for(k <- keys; v <- vals; codec <- codecs)
-      messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
+    val timestamps = Array(Message.NoTimestamp, 0L, 1L)
+    val magicValues = Array(Message.MagicValue_V0, Message.MagicValue_V1)
+    for(k <- keys; v <- vals; codec <- codecs; t <- timestamps; mv <- magicValues) {
+      val timestamp = ensureValid(mv, t)
+      messages += new MessageTestVal(k, v, codec, timestamp, mv, new Message(v, k, timestamp, codec, mv))
+    }
+
+    def ensureValid(magicValue: Byte, timestamp: Long): Long =
+      if (magicValue > Message.MagicValue_V0) timestamp else Message.NoTimestamp
   }
 
   @Test
   def testFieldValues {
     for(v <- messages) {
+      // check payload
       if(v.payload == null) {
         assertTrue(v.message.isNull)
         assertEquals("Payload should be null", null, v.message.payload)
       } else {
         TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
       }
-      assertEquals(Message.CurrentMagicValue, v.message.magic)
+      // check timestamp
+      if (v.magicValue > Message.MagicValue_V0)
+        assertEquals("Timestamp should be the same", v.timestamp, v.message.timestamp)
+       else
+        assertEquals("Timestamp should be the NoTimestamp", Message.NoTimestamp, v.message.timestamp)
+
+      // check magic value
+      assertEquals(v.magicValue, v.message.magic)
+      // check key
       if(v.message.hasKey)
         TestUtils.checkEquals(ByteBuffer.wrap(v.key), v.message.key)
       else
         assertEquals(null, v.message.key)
+      // check compression codec
       assertEquals(v.codec, v.message.compressionCodec)
     }
   }
@@ -82,12 +100,61 @@ class MessageTest extends JUnitSuite {
       assertFalse("Should not equal null", v.message.equals(null))
       assertFalse("Should not equal a random string", v.message.equals("asdf"))
       assertTrue("Should equal itself", v.message.equals(v.message))
-      val copy = new Message(bytes = v.payload, key = v.key, codec = v.codec)
+      val copy = new Message(bytes = v.payload, key = v.key, v.timestamp, codec = v.codec, v.magicValue)
       assertTrue("Should equal another message with the same content.", v.message.equals(copy))
     }
   }
 
   @Test
+  def testMessageFormatConversion() {
+
+    def convertAndVerify(v: MessageTestVal, fromMessageFormat: Byte, toMessageFormat: Byte) {
+      assertEquals("Message should be the same when convert to the same version.",
+        v.message.toFormatVersion(fromMessageFormat), v.message)
+      val convertedMessage = v.message.toFormatVersion(toMessageFormat)
+      assertEquals("Size difference is not expected value", convertedMessage.size - v.message.size,
+        Message.headerSizeDiff(fromMessageFormat, toMessageFormat))
+      assertTrue("Message should still be valid", convertedMessage.isValid)
+      assertEquals("Timestamp should be NoTimestamp", convertedMessage.timestamp, Message.NoTimestamp)
+      assertEquals(s"Magic value should be $toMessageFormat now", convertedMessage.magic, toMessageFormat)
+      if (convertedMessage.hasKey)
+        assertEquals("Message key should not change", convertedMessage.key, ByteBuffer.wrap(v.key))
+      else
+        assertNull(convertedMessage.key)
+      if(v.payload == null) {
+        assertTrue(convertedMessage.isNull)
+        assertEquals("Payload should be null", null, convertedMessage.payload)
+      } else {
+        assertEquals("Message payload should not change", convertedMessage.payload, ByteBuffer.wrap(v.payload))
+      }
+      assertEquals("Compression codec should not change", convertedMessage.compressionCodec, v.codec)
+    }
+
+    for (v <- messages) {
+      if (v.magicValue == Message.MagicValue_V0) {
+        convertAndVerify(v, Message.MagicValue_V0, Message.MagicValue_V1)
+      } else if (v.magicValue == Message.MagicValue_V1) {
+        convertAndVerify(v, Message.MagicValue_V1, Message.MagicValue_V0)
+      }
+    }
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testInvalidTimestampAndMagicValueCombination() {
+      new Message("hello".getBytes, 0L, Message.MagicValue_V0)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testInvalidTimestamp() {
+    new Message("hello".getBytes, -3L, Message.MagicValue_V1)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testInvalidMagicByte() {
+    new Message("hello".getBytes, 0L, 2.toByte)
+  }
+
+  @Test
   def testIsHashable() {
     // this is silly, but why not
     val m = new HashMap[Message, Message]()

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
index 3993fdb..6f0ee1d 100644
--- a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
@@ -20,6 +20,7 @@ package kafka.message
 import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream}
 import java.nio.ByteBuffer
 import java.util.Random
+import org.apache.kafka.common.record.TimestampType
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnitSuite
@@ -34,7 +35,7 @@ class MessageWriterTest extends JUnitSuite {
 
   private def mkMessageWithWriter(key: Array[Byte] = null, bytes: Array[Byte], codec: CompressionCodec): Message = {
     val writer = new MessageWriter(100)
-    writer.write(key = key, codec = codec) { output =>
+    writer.write(key = key, codec = codec, timestamp = Message.NoTimestamp, timestampType = TimestampType.CREATE_TIME, magicValue = Message.MagicValue_V1) { output =>
       val out = if (codec == NoCompressionCodec) output else CompressionFactory(codec, output)
       try {
         val p = rnd.nextInt(bytes.length)
@@ -101,7 +102,7 @@ class MessageWriterTest extends JUnitSuite {
   def testWithNoCompressionAttribute(): Unit = {
     val bytes = mkRandomArray(4096)
     val actual = mkMessageWithWriter(bytes = bytes, codec = NoCompressionCodec)
-    val expected = new Message(bytes, NoCompressionCodec)
+    val expected = new Message(bytes, Message.NoTimestamp, NoCompressionCodec, Message.MagicValue_V1)
     assertEquals(expected.buffer, actual.buffer)
   }
 
@@ -109,7 +110,7 @@ class MessageWriterTest extends JUnitSuite {
   def testWithCompressionAttribute(): Unit = {
     val bytes = mkRandomArray(4096)
     val actual = mkMessageWithWriter(bytes = bytes, codec = SnappyCompressionCodec)
-    val expected = new Message(compress(bytes, SnappyCompressionCodec), SnappyCompressionCodec)
+    val expected = new Message(compress(bytes, SnappyCompressionCodec), Message.NoTimestamp, SnappyCompressionCodec, Message.MagicValue_V1)
 
     assertEquals(
       decompress(toArray(expected.payload), SnappyCompressionCodec).toSeq,
@@ -122,7 +123,7 @@ class MessageWriterTest extends JUnitSuite {
     val key = mkRandomArray(123)
     val bytes = mkRandomArray(4096)
     val actual = mkMessageWithWriter(bytes = bytes, key = key, codec = NoCompressionCodec)
-    val expected = new Message(bytes = bytes, key = key, codec = NoCompressionCodec)
+    val expected = new Message(bytes = bytes, key = key, timestamp = Message.NoTimestamp, codec = NoCompressionCodec, magicValue = Message.MagicValue_V1)
 
     assertEquals(expected.buffer, actual.buffer)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 60d2588..f711ca4 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -396,13 +396,18 @@ class AsyncProducerTest {
     EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(4)
     EasyMock.expect(producerPool.close())
     EasyMock.replay(producerPool)
-
+    val time = new Time {
+      override def nanoseconds: Long = 0L
+      override def milliseconds: Long = 0L
+      override def sleep(ms: Long): Unit = {}
+    }
     val handler = new DefaultEventHandler[Int,String](config,
                                                       partitioner = new FixedValuePartitioner(),
                                                       encoder = new StringEncoder(),
                                                       keyEncoder = new NullEncoder[Int](),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos = topicPartitionInfos)
+                                                      topicPartitionInfos = topicPartitionInfos,
+                                                      time = time)
     val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
     handler.handle(data)
     handler.close()
@@ -466,11 +471,13 @@ class AsyncProducerTest {
   }
 
   def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
-    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes)): _*)
+    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes, 0L, Message.MagicValue_V1)): _*)
   }
 
   def messagesToSet(key: Array[Byte], messages: Seq[Array[Byte]]): ByteBufferMessageSet = {
-    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(key = key, bytes = m)): _*)
+    new ByteBufferMessageSet(
+      NoCompressionCodec,
+      messages.map(m => new Message(key = key, bytes = m, timestamp = 0L, magicValue = Message.MagicValue_V1)): _*)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index f356a69..12b3583 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.producer
 
+import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
 
@@ -30,6 +31,7 @@ import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.TimestampType
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -164,10 +166,11 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName,
       producerProps = props1)
-
+    val startTime = System.currentTimeMillis()
     // Available partition ids should be 0.
     producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
     producer1.send(new KeyedMessage[String, String](topic, "test", "test2"))
+    val endTime = System.currentTimeMillis()
     // get the leader
     val leaderOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
@@ -181,8 +184,19 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       response2.messageSet("new-topic", 0).iterator.toBuffer
     }
     assertEquals("Should have fetched 2 messages", 2, messageSet.size)
-    assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet(0).message)
-    assertEquals(new Message(bytes = "test2".getBytes, key = "test".getBytes), messageSet(1).message)
+    // Message 1
+    assertTrue(ByteBuffer.wrap("test1".getBytes).equals(messageSet(0).message.payload))
+    assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet(0).message.key))
+    assertTrue(messageSet(0).message.timestamp >= startTime && messageSet(0).message.timestamp < endTime)
+    assertEquals(TimestampType.CREATE_TIME, messageSet(0).message.timestampType)
+    assertEquals(Message.MagicValue_V1, messageSet(0).message.magic)
+
+    // Message 2
+    assertTrue(ByteBuffer.wrap("test2".getBytes).equals(messageSet(1).message.payload))
+    assertTrue(ByteBuffer.wrap("test".getBytes).equals(messageSet(1).message.key))
+    assertTrue(messageSet(1).message.timestamp >= startTime && messageSet(1).message.timestamp < endTime)
+    assertEquals(TimestampType.CREATE_TIME, messageSet(1).message.timestampType)
+    assertEquals(Message.MagicValue_V1, messageSet(1).message.magic)
     producer1.close()
 
     val props2 = new util.Properties()
@@ -227,7 +241,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName,
       producerProps = props)
-
+    val startTime = System.currentTimeMillis()
     try {
       // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
       // on broker 0
@@ -235,7 +249,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     } catch {
       case e: Throwable => fail("Unexpected exception: " + e)
     }
-
+    val endTime = System.currentTimeMillis()
     // kill the broker
     server1.shutdown
     server1.awaitShutdown()
@@ -260,7 +274,12 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       val messageSet1 = response1.messageSet(topic, 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet1.next.message)
+      val message = messageSet1.next.message
+      assertTrue(ByteBuffer.wrap("test1".getBytes).equals(message.payload))
+      assertTrue(ByteBuffer.wrap("test".getBytes).equals(message.key))
+      assertTrue(message.timestamp >= startTime && message.timestamp < endTime)
+      assertEquals(TimestampType.CREATE_TIME, message.timestampType)
+      assertEquals(Message.MagicValue_V1, message.magic)
       assertFalse("Message set should have another message", messageSet1.hasNext)
     } catch {
       case e: Exception => fail("Not expected", e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 6e7b964..c1034fe 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -20,7 +20,6 @@ package kafka.producer
 import java.net.SocketTimeoutException
 import java.util.Properties
 
-import org.junit.Assert
 import kafka.admin.AdminUtils
 import kafka.api.ProducerResponseStatus
 import kafka.common.TopicAndPartition
@@ -30,6 +29,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.junit.Test
+import org.junit.Assert._
 
 class SyncProducerTest extends KafkaServerTestHarness {
   private val messageBytes =  new Array[Byte](2)
@@ -48,28 +48,28 @@ class SyncProducerTest extends KafkaServerTestHarness {
     try {
       val response = producer.send(TestUtils.produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-      Assert.assertNotNull(response)
+      assertNotNull(response)
     } catch {
-      case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
+      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
     }
     val firstEnd = SystemTime.milliseconds
-    Assert.assertTrue((firstEnd-firstStart) < 500)
+    assertTrue((firstEnd-firstStart) < 500)
     val secondStart = SystemTime.milliseconds
     try {
       val response = producer.send(TestUtils.produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-      Assert.assertNotNull(response)
+      assertNotNull(response)
     } catch {
-      case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
+      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
     }
     val secondEnd = SystemTime.milliseconds
-    Assert.assertTrue((secondEnd-secondStart) < 500)
+    assertTrue((secondEnd-secondStart) < 500)
     try {
       val response = producer.send(TestUtils.produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
-      Assert.assertNotNull(response)
+      assertNotNull(response)
     } catch {
-      case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage)
+      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
     }
   }
 
@@ -87,8 +87,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
-    Assert.assertTrue(response != null)
-    Assert.assertTrue(!response.hasError && response.status.size == 0)
+    assertTrue(response != null)
+    assertTrue(!response.hasError && response.status.size == 0)
   }
 
   @Test
@@ -103,18 +103,18 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
     val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1))
 
-    Assert.assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
-    Assert.assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error)
-    Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
+    assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
+    assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error)
+    assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
 
-    val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1
+    val safeSize = configs(0).messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1
     val message2 = new Message(new Array[Byte](safeSize))
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
     val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1))
 
-    Assert.assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
-    Assert.assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("test", 0)).error)
-    Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
+    assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
+    assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("test", 0)).error)
+    assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
   }
 
 
@@ -157,13 +157,14 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1)
     val response = producer.send(request)
 
-    Assert.assertNotNull(response)
-    Assert.assertEquals(request.correlationId, response.correlationId)
-    Assert.assertEquals(3, response.status.size)
+    assertNotNull(response)
+    assertEquals(request.correlationId, response.correlationId)
+    assertEquals(3, response.status.size)
     response.status.values.foreach {
-      case ProducerResponseStatus(error, nextOffset) =>
-        Assert.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, error)
-        Assert.assertEquals(-1L, nextOffset)
+      case ProducerResponseStatus(error, nextOffset, timestamp) =>
+        assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, error)
+        assertEquals(-1L, nextOffset)
+        assertEquals(Message.NoTimestamp, timestamp)
     }
 
     // #2 - test that we get correct offsets when partition is owned by broker
@@ -173,20 +174,20 @@ class SyncProducerTest extends KafkaServerTestHarness {
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic3", 0)
 
     val response2 = producer.send(request)
-    Assert.assertNotNull(response2)
-    Assert.assertEquals(request.correlationId, response2.correlationId)
-    Assert.assertEquals(3, response2.status.size)
+    assertNotNull(response2)
+    assertEquals(request.correlationId, response2.correlationId)
+    assertEquals(3, response2.status.size)
 
     // the first and last message should have been accepted by broker
-    Assert.assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic1", 0)).error)
-    Assert.assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic3", 0)).error)
-    Assert.assertEquals(0, response2.status(TopicAndPartition("topic1", 0)).offset)
-    Assert.assertEquals(0, response2.status(TopicAndPartition("topic3", 0)).offset)
+    assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic1", 0)).error)
+    assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic3", 0)).error)
+    assertEquals(0, response2.status(TopicAndPartition("topic1", 0)).offset)
+    assertEquals(0, response2.status(TopicAndPartition("topic3", 0)).offset)
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
                         response2.status(TopicAndPartition("topic2", 0)).error)
-    Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).offset)
+    assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).offset)
   }
 
   @Test
@@ -207,14 +208,14 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val t1 = SystemTime.milliseconds
     try {
       producer.send(request)
-      Assert.fail("Should have received timeout exception since request handling is stopped.")
+      fail("Should have received timeout exception since request handling is stopped.")
     } catch {
       case e: SocketTimeoutException => /* success */
-      case e: Throwable => Assert.fail("Unexpected exception when expecting timeout: " + e)
+      case e: Throwable => fail("Unexpected exception when expecting timeout: " + e)
     }
     val t2 = SystemTime.milliseconds
     // make sure we don't wait fewer than timeoutMs for a response
-    Assert.assertTrue((t2-t1) >= timeoutMs)
+    assertTrue((t2-t1) >= timeoutMs)
   }
 
   @Test
@@ -230,7 +231,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
-    Assert.assertTrue(response == null)
+    assertTrue(response == null)
   }
 
   @Test
@@ -250,6 +251,6 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val response = producer.send(TestUtils.produceRequest(topicName, 0,
       new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
 
-    Assert.assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, response.status(TopicAndPartition(topicName, 0)).error)
+    assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, response.status(TopicAndPartition(topicName, 0)).error)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 155eea0..2ccb7b8 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -114,7 +114,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
     val serializedBytes = {
-      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, 1, null, correlationId)
+      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, 2, null, correlationId)
       val messageBytes = "message".getBytes
       val request = new ProduceRequest(1, 10000, Map(topicPartition -> ByteBuffer.wrap(messageBytes)).asJava)
       val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.sizeOf)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2479b37..7fe9ffc 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.util.Properties
 
 import junit.framework.Assert._
-import kafka.api.{ApiVersion, KAFKA_082}
+import kafka.api.{ApiVersion, KAFKA_0_8_2}
 import kafka.message._
 import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.common.config.ConfigException
@@ -281,14 +281,18 @@ class KafkaConfigTest {
     val conf = KafkaConfig.fromProps(props)
     assertEquals(ApiVersion.latestVersion, conf.interBrokerProtocolVersion)
 
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp,"0.8.2.0")
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
+    // We need to set the message format version to make the configuration valid.
+    props.put(KafkaConfig.MessageFormatVersionProp, "0.8.2.0")
     val conf2 = KafkaConfig.fromProps(props)
-    assertEquals(KAFKA_082, conf2.interBrokerProtocolVersion)
+    assertEquals(KAFKA_0_8_2, conf2.interBrokerProtocolVersion)
 
     // check that 0.8.2.0 is the same as 0.8.2.1
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp,"0.8.2.1")
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
+    // We need to set the message format version to make the configuration valid
+    props.put(KafkaConfig.MessageFormatVersionProp, "0.8.2.1")
     val conf3 = KafkaConfig.fromProps(props)
-    assertEquals(KAFKA_082, conf3.interBrokerProtocolVersion)
+    assertEquals(KAFKA_0_8_2, conf3.interBrokerProtocolVersion)
 
     //check that latest is newer than 0.8.2
     assert(ApiVersion.latestVersion.onOrAfter(conf3.interBrokerProtocolVersion))
@@ -460,7 +464,7 @@ class KafkaConfigTest {
         case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.LogDirsProp => // ignore string
         case KafkaConfig.LogDirProp => // ignore string
-        case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinHeaderSize - 1)
+        case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinMessageOverhead - 1)
 
         case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index b6bc4fc..5c2092c 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -27,7 +27,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.AdminUtils
-import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
+import kafka.api.{ApiVersion, PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
 import kafka.common.TopicAndPartition
 import org.junit.After
@@ -91,17 +91,17 @@ class LogOffsetTest extends ZooKeeperTestHarness {
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 10)
-    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets)
+    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
     val topicAndPartition = TopicAndPartition(topic, part)
     val offsetRequest = OffsetRequest(
-      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
       replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), consumerOffsets)
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
 
     // try to fetch using latest offset
     val fetchResponse = simpleConsumer.fetch(
@@ -154,15 +154,15 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
 
-    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), now, 10)
-    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets)
+    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), now, 15)
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
     val topicAndPartition = TopicAndPartition(topic, part)
-    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
+    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 15)), replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), consumerOffsets)
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets)
   }
 
   @Test
@@ -206,6 +206,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     props.put("log.retention.check.interval.ms", (5*1000*60).toString)
     props.put("log.segment.bytes", logSize.toString)
     props.put("zookeeper.connect", zkConnect.toString)
+    props.put("message.format.version", "0.10.0")
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 66052fc..de92a24 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -35,11 +35,11 @@ class ConsoleConsumerTest extends JUnitSuite {
     val formatter = EasyMock.createNiceMock(classOf[MessageFormatter])
 
     //Stubs
-    val record = new BaseConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]())
+    val record = new BaseConsumerRecord(topic = "foo", partition = 1, offset = 1, key = Array[Byte](), value = Array[Byte]())
 
     //Expectations
     val messageLimit: Int = 10
-    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
+    EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit)
     EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit)
 
     EasyMock.replay(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 29e1082..2523083 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -282,8 +282,11 @@ object TestUtils extends Logging {
    * Wrap the message in a message set
    * @param payload The bytes of the message
    */
-  def singleMessageSet(payload: Array[Byte], codec: CompressionCodec = NoCompressionCodec, key: Array[Byte] = null) =
-    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key))
+  def singleMessageSet(payload: Array[Byte],
+                       codec: CompressionCodec = NoCompressionCodec,
+                       key: Array[Byte] = null,
+                       magicValue: Byte = Message.CurrentMagicValue) =
+    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, Message.NoTimestamp, magicValue))
 
   /**
    * Generate an array of random bytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ba59cc0..f6d67eb 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -17,6 +17,58 @@
 
 <h3><a id="upgrade" href="#upgrade">1.5 Upgrading From Previous Versions</a></h3>
 
+<h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4>
+0.10.0.0 has <a href="#upgrade_10_performance_impact">potential performance impact during upgrade</a> and
+<a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading). Because new protocols
+are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=CURRENT_KAFKA_VERSION(e.g. 0.8.2, 0.9.0.0).
+         We recommend the users to set message.format.version=CURRENT_KAFKA_VERSION as well to avoid performance regression
+         during upgrade. See <a href="#upgrade_10_performance_impact">potential performance impact during upgrade</a> for the details.
+    </li>
+    <li> Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.0.0. </li>
+    <li> Restart the brokers one by one for the new protocol version to take effect. </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
+
+<h5><a id="upgrade_10_performance_impact" href="#upgrade_10_performance_impact">potential performance impact in 0.10.0.0 during upgrade</a></h5>
+<p>
+    Message format in 0.10.0 now includes a new timestamp field and uses relative offsets for compressed messages.
+    The on disk message format can be configured through message.format.version in server.properties file.
+    The default on-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, it only understands
+    message format before 0.10.0. In this case, the broker is able to convert messages of the format in 0.10.0 to earlier format
+    before sending a response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case.
+
+    To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set the message format to 0.9.0 when
+    upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old
+    consumers. Once most consumers are upgraded, one can change the message format to 0.10.0 on the broker.
+</p>
+<p>
+    For clients that are upgraded to 0.10.0.0, there is no performance impact.
+</p>
+<p>
+    <b>Note:</b> By setting the message format version, one certifies all the existing messages are on or below that
+    message format version. Otherwise consumers before 0.10.0.0 might break. In particular, after the message format
+    is set to 0.10.0, one should not change it back to earlier format since it may break the consumer on versions before 0.10.0.0.
+</p>
+
+<h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">potential breaking changes in 0.10.0.0</a></h5>
+<ul>
+    <li> Starting from Kafka 0.10.0.0, message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0. </li>
+    <li> Message format 0.10.0 is added and used by default to include a timestamp field in the messages and use relative offsets for compressed messages. </li>
+    <li> ProduceRequest/Response v2 is added and used by default to support message format 0.10.0 </li>
+    <li> FetchRequest/Response v2 is added and used by default to support message format 0.10.0 </li>
+    <li> MessageFormatter interface changed from <code>void writeTo(byte[] key, byte[] value, PrintStream output)</code> to
+         <code>void writeTo(byte[] key, byte[] value, long timestamp, TimestampType timestampType, PrintStream output)</code> </li>
+</ul>
+
 <h4><a id="upgrade_9" href="#upgrade_9">Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0</a></h4>
 
 0.9.0.0 has <a href="#upgrade_9_breaking">potential breaking changes</a> (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 66f78d2..0787204 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.ArrayDeque;
@@ -76,7 +77,8 @@ public class RecordQueue {
             Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
             Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
 
-            ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), key, value);
+            ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(),
+                rawRecord.offset(), rawRecord.timestamp(), TimestampType.CREATE_TIME, key, value);
             long timestamp = timestampExtractor.extract(record);
 
             StampedRecord stampedRecord = new StampedRecord(record, timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index b91acdc..9d0c0e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -59,17 +60,17 @@ public class PartitionGroupTest {
 
         // add three 3 records with timestamp 1, 3, 5 to partition-1
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
 
         group.addRawRecords(partition1, list1);
 
         // add three 3 records with timestamp 2, 4, 6 to partition-2
         List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
 
         group.addRawRecords(partition2, list2);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index bc6f71b..916079d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
@@ -83,7 +84,8 @@ public class ProcessorStateManagerTest {
         // buffer a record (we cannot use addRecord because we need to add records before assigning a partition)
         public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
             recordBuffer.add(
-                new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
+                new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), 0L,
+                    TimestampType.CREATE_TIME,
                     serializer.serialize(record.topic(), record.key()),
                     serializer.serialize(record.topic(), record.value())));
             endOffset = record.offset();
@@ -267,7 +269,7 @@ public class ProcessorStateManagerTest {
                     int key = i * 10;
                     expectedKeys.add(key);
                     restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>(persistentStoreTopicName, 2, offset, key, 0)
+                            new ConsumerRecord<>(persistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0)
                     );
                 }
 
@@ -320,7 +322,7 @@ public class ProcessorStateManagerTest {
                     int key = i;
                     expectedKeys.add(i);
                     restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>(nonPersistentStoreTopicName, 2, offset, key, 0)
+                            new ConsumerRecord<>(nonPersistentStoreTopicName, 2, 0L, offset, TimestampType.CREATE_TIME, key, 0)
                     );
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 6e86410..614e2c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -50,9 +51,9 @@ public class RecordQueueTest {
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
 
         queue.addRawRecords(list1, timestampExtractor);
 
@@ -72,9 +73,9 @@ public class RecordQueueTest {
         // add three 3 out-of-order records with timestamp 4, 1, 2
         // now with 3, 4, 1, 2
         List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
 
         queue.addRawRecords(list2, timestampExtractor);
 
@@ -99,9 +100,9 @@ public class RecordQueueTest {
 
         // add three more records with 4, 5, 6
         List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, recordKey, recordValue));
 
         queue.addRawRecords(list3, timestampExtractor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index fd6f49f..e0be587 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
@@ -152,7 +153,7 @@ public class StandbyTaskTest {
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
             task.update(partition1,
-                    records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue))
+                    records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue))
             );
 
         } finally {
@@ -171,9 +172,9 @@ public class StandbyTaskTest {
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
             for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 1, 100),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 2, 100),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 3, 100))) {
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100))) {
                 restoreStateConsumer.bufferRecord(record);
             }
 
@@ -234,11 +235,11 @@ public class StandbyTaskTest {
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
             for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 1, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 2, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 3, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 4, 100),
-                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 5, 100))) {
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 10, 0L, TimestampType.CREATE_TIME, 1, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 20, 0L, TimestampType.CREATE_TIME, 2, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 30, 0L, TimestampType.CREATE_TIME, 3, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 40, 0L, TimestampType.CREATE_TIME, 4, 100),
+                    new ConsumerRecord<>(ktable.topic(), ktable.partition(), 50, 0L, TimestampType.CREATE_TIME, 5, 100))) {
                 restoreStateConsumer.bufferRecord(record);
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bf3b3b1..94f0ce3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -107,15 +108,15 @@ public class StreamTaskTest {
             StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
             ));
 
             task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
             ));
 
             assertEquals(5, task.process());
@@ -158,15 +159,15 @@ public class StreamTaskTest {
             StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
             ));
 
             task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
             ));
 
             assertEquals(5, task.process());
@@ -177,9 +178,9 @@ public class StreamTaskTest {
             assertTrue(consumer.paused().contains(partition2));
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, 0L, TimestampType.CREATE_TIME, recordKey, recordValue)
             ));
 
             assertEquals(2, consumer.paused().size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c8195f/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 5edff28..e414d80 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
@@ -200,7 +201,7 @@ public class ProcessorTopologyTestDriver {
         }
         // Add the record ...
         long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
-        task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, key, value)));
+        task.addRecords(tp, records(new ConsumerRecord<byte[], byte[]>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, key, value)));
         producer.clear();
         // Process the record ...
         task.process();


Mime
View raw message