kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-3163; Add time based index to Kafka.
Date Fri, 19 Aug 2016 17:07:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 05ed54bf2 -> 79d3fd2bf


http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 15d4eea..aadda86 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -400,7 +400,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
    * If no format conversion or value overwriting is required for messages, this method will perform in-place
    * operations and avoid re-compression.
    *
-   * Returns the message set and a boolean indicating whether the message sizes may have changed.
+   * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
+   * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
    */
   private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
                                                       now: Long,
@@ -409,18 +410,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
                                                       compactedTopic: Boolean = false,
                                                       messageFormatVersion: Byte = Message.CurrentMagicValue,
                                                       messageTimestampType: TimestampType,
-                                                      messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
+                                                      messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
-      if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
-        // Message format conversion
-        (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
-          messageFormatVersion), true)
-      } else {
+      if (!isMagicValueInAllWrapperMessages(messageFormatVersion))
+        convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
+          messageFormatVersion)
+      else
         // Do in-place validation, offset assignment and maybe set timestamp
-        (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
-          messageTimestampDiffMaxMs), false)
-      }
+        validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
+          messageTimestampDiffMaxMs)
     } else {
       // Deal with compressed messages
       // We cannot do in place assignment in one of the following situations:
@@ -433,6 +432,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
 
       var maxTimestamp = Message.NoTimestamp
+      var offsetOfMaxTimestamp = -1L
       val expectedInnerOffset = new LongRef(0)
       val validatedMessages = new mutable.ArrayBuffer[Message]
       this.internalIterator(isShallow = false).foreach { messageAndOffset =>
@@ -446,7 +446,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           // Check if we need to overwrite offset
           if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
             inPlaceAssignment = false
-          maxTimestamp = math.max(maxTimestamp, message.timestamp)
+          if (message.timestamp > maxTimestamp) {
+            maxTimestamp = message.timestamp
+            offsetOfMaxTimestamp = offsetCounter.value + expectedInnerOffset.value - 1
+          }
         }
 
         if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
@@ -462,20 +465,23 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
 
       if (!inPlaceAssignment) {
         // Cannot do in place assignment.
-        val wrapperMessageTimestamp = {
+        val (largestTimestampOfMessageSet, offsetOfMaxTimestampInMessageSet) = {
           if (messageFormatVersion == Message.MagicValue_V0)
-            Some(Message.NoTimestamp)
-          else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
-            Some(maxTimestamp)
+            (Some(Message.NoTimestamp), -1L)
+          else if (messageTimestampType == TimestampType.CREATE_TIME)
+            (Some(maxTimestamp), {if (targetCodec == NoCompressionCodec) offsetOfMaxTimestamp else offsetCounter.value + validatedMessages.length - 1})
           else // Log append time
-            Some(now)
+            (Some(now), {if (targetCodec == NoCompressionCodec) offsetCounter.value else offsetCounter.value + validatedMessages.length - 1})
         }
 
-        (new ByteBufferMessageSet(compressionCodec = targetCodec,
-                                  offsetCounter = offsetCounter,
-                                  wrapperMessageTimestamp = wrapperMessageTimestamp,
-                                  timestampType = messageTimestampType,
-                                  messages = validatedMessages: _*), true)
+        ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(compressionCodec = targetCodec,
+                                                                                     offsetCounter = offsetCounter,
+                                                                                     wrapperMessageTimestamp = largestTimestampOfMessageSet,
+                                                                                     timestampType = messageTimestampType,
+                                                                                     messages = validatedMessages: _*),
+                                        maxTimestamp = largestTimestampOfMessageSet.get,
+                                        offsetOfMaxTimestamp = offsetOfMaxTimestampInMessageSet,
+                                        messageSizeMaybeChanged = true)
       } else {
         // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
         buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
@@ -487,6 +493,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
         val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
         val timestamp = buffer.getLong(timestampOffset)
         val attributes = buffer.get(attributeOffset)
+        buffer.putLong(timestampOffset, maxTimestamp)
         if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
           // We don't need to recompute crc if the timestamp is not updated.
           crcUpdateNeeded = false
@@ -503,7 +510,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
         }
         buffer.rewind()
-        (this, false)
+        // For compressed messages,
+        ValidationAndOffsetAssignResult(validatedMessages = this,
+                                        maxTimestamp = buffer.getLong(timestampOffset),
+                                        offsetOfMaxTimestamp = buffer.getLong(0),
+                                        messageSizeMaybeChanged = false)
       }
     }
   }
@@ -516,12 +527,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
                                            now: Long,
                                            timestampType: TimestampType,
                                            messageTimestampDiffMaxMs: Long,
-                                           toMagicValue: Byte): ByteBufferMessageSet = {
+                                           toMagicValue: Byte): ValidationAndOffsetAssignResult = {
     val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset =>
       Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue)
     }.sum
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     var newMessagePosition = 0
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
     this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) =>
       validateMessageKey(message, compactedTopic)
       validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
@@ -532,20 +545,31 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       val newMessageBuffer = newBuffer.slice()
       newMessageBuffer.limit(newMessageSize)
       message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType)
-
+      if (toMagicValue > Message.MagicValue_V0) {
+        val timestamp = newMessageBuffer.getLong(Message.TimestampOffset)
+        if (maxTimestamp < timestamp) {
+          maxTimestamp = timestamp
+          offsetOfMaxTimestamp = offsetCounter.value - 1
+        }
+      }
       newMessagePosition += MessageSet.LogOverhead + newMessageSize
     }
     newBuffer.rewind()
-    new ByteBufferMessageSet(newBuffer)
+    new ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(newBuffer),
+                                        maxTimestamp = maxTimestamp,
+                                        offsetOfMaxTimestamp = offsetOfMaxTimestamp,
+                                        messageSizeMaybeChanged = true)
   }
 
   private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef,
                                                                   now: Long,
                                                                   compactedTopic: Boolean,
                                                                   timestampType: TimestampType,
-                                                                  timestampDiffMaxMs: Long): ByteBufferMessageSet = {
+                                                                  timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
     // do in-place validation and offset assignment
     var messagePosition = 0
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
     buffer.mark()
     while (messagePosition < sizeInBytes - MessageSet.LogOverhead) {
       buffer.position(messagePosition)
@@ -562,11 +586,19 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes))
           Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum)
         }
+        if (message.timestamp > maxTimestamp) {
+          maxTimestamp = message.timestamp
+          offsetOfMaxTimestamp = offsetCounter.value - 1
+        }
       }
+
       messagePosition += MessageSet.LogOverhead + messageSize
     }
     buffer.reset()
-    this
+    ValidationAndOffsetAssignResult(validatedMessages = this,
+                                    maxTimestamp = maxTimestamp,
+                                    offsetOfMaxTimestamp = offsetOfMaxTimestamp,
+                                    messageSizeMaybeChanged = false)
   }
 
   private def validateMessageKey(message: Message, compactedTopic: Boolean) {
@@ -614,3 +646,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   override def hashCode: Int = buffer.hashCode
 
 }
+
+case class ValidationAndOffsetAssignResult(validatedMessages: ByteBufferMessageSet,
+                                           maxTimestamp: Long,
+                                           offsetOfMaxTimestamp: Long,
+                                           messageSizeMaybeChanged: Boolean)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/message/MessageAndOffset.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala
index 51edf9f..fab6898 100644
--- a/core/src/main/scala/kafka/message/MessageAndOffset.scala
+++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala
@@ -24,5 +24,13 @@ case class MessageAndOffset(message: Message, offset: Long) {
    * Compute the offset of the next message in the log
    */
   def nextOffset: Long = offset + 1
+
+  /**
+   * We need to decompress the message, if required, to get the offset of the first uncompressed message.
+   */
+  def firstOffset: Long = message.compressionCodec match {
+    case NoCompressionCodec => offset
+    case _ => ByteBufferMessageSet.deepIterator(this).next().offset
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bb219ca..6eb574f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -38,10 +38,11 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
-import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, SaslHandshakeResponse, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
+import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.requests.SaslHandshakeResponse
 
 import scala.collection._
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2b97783..f94cbf9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -335,7 +335,7 @@ class ReplicaManager(val config: KafkaConfig,
         topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
+                  new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.logAppendTime)) // response status
       }
 
       if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index dc99672..0a659f4 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -25,12 +25,13 @@ import kafka.coordinator.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
 import kafka.log._
 import kafka.message._
 import kafka.serializer.Decoder
-import kafka.utils.{VerifiableProperties, _}
+import kafka.utils._
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 object DumpLogSegments {
 
@@ -85,6 +86,7 @@ object DumpLogSegments {
     }
 
     val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
+    val timeIndexDumpErrors = new TimeIndexDumpErrors
     val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
 
     for(arg <- files) {
@@ -95,8 +97,12 @@ object DumpLogSegments {
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
         dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
+      } else if(file.getName.endsWith(Log.TimeIndexFileSuffix)) {
+        println("Dumping " + file)
+        dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
       }
     }
+
     misMatchesForIndexFilesMap.foreach {
       case (fileName, listOfMismatches) => {
         System.err.println("Mismatches in :" + fileName)
@@ -105,6 +111,9 @@ object DumpLogSegments {
         })
       }
     }
+
+    timeIndexDumpErrors.printErrors()
+
     nonConsecutivePairsForLogFilesMap.foreach {
       case (fileName, listOfNonConsecutivePairs) => {
         System.err.println("Non-secutive offsets in :" + fileName)
@@ -150,6 +159,58 @@ object DumpLogSegments {
     }
   }
 
+  private def dumpTimeIndex(file: File,
+                            indexSanityOnly: Boolean,
+                            verifyOnly: Boolean,
+                            timeIndexDumpErrors: TimeIndexDumpErrors,
+                            maxMessageSize: Int) {
+    val startOffset = file.getName().split("\\.")(0).toLong
+    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
+    val messageSet = new FileMessageSet(logFile, false)
+    val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.IndexFileSuffix)
+    val index = new OffsetIndex(indexFile, baseOffset = startOffset)
+    val timeIndex = new TimeIndex(file, baseOffset = startOffset)
+
+    //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not.
+    if (indexSanityOnly) {
+      timeIndex.sanityCheck
+      println(s"$file passed sanity check.")
+      return
+    }
+
+    var prevTimestamp = Message.NoTimestamp
+    for(i <- 0 until timeIndex.entries) {
+      val entry = timeIndex.entry(i)
+      val position = index.lookup(entry.offset + timeIndex.baseOffset).position
+      val partialFileMessageSet: FileMessageSet = messageSet.read(position, Int.MaxValue)
+      val shallowIter = partialFileMessageSet.iterator
+      var maxTimestamp = Message.NoTimestamp
+      // We first find the message by offset then check if the timestamp is correct.
+      val wrapperMessageOpt = shallowIter.find(_.offset >= entry.offset + timeIndex.baseOffset)
+      if (!wrapperMessageOpt.isDefined || wrapperMessageOpt.get.offset != entry.offset + timeIndex.baseOffset) {
+        timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset,
+          {if (wrapperMessageOpt.isDefined) wrapperMessageOpt.get.offset else -1})
+      } else {
+        val deepIter = getIterator(wrapperMessageOpt.get, isDeepIteration = true)
+        for (messageAndOffset <- deepIter)
+          maxTimestamp = math.max(maxTimestamp, messageAndOffset.message.timestamp)
+
+        if (maxTimestamp != entry.timestamp)
+          timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp)
+
+        if (prevTimestamp >= entry.timestamp)
+          timeIndexDumpErrors.recordOutOfOrderIndexTimestamp(file, entry.timestamp, prevTimestamp)
+
+        // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
+        if (entry.offset == 0 && i > 0)
+          return
+      }
+      if (!verifyOnly)
+        println("timestamp: %s offset: %s".format(entry.timestamp, timeIndex.baseOffset + entry.offset))
+      prevTimestamp = entry.timestamp
+    }
+  }
+
   private trait MessageParser[K, V] {
     def parse(message: Message): (Option[K], Option[V])
   }
@@ -261,7 +322,8 @@ object DumpLogSegments {
         }
         lastOffset = messageAndOffset.offset
 
-        print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
+        print("offset: " + messageAndOffset.offset + " position: " + validBytes +
+              " " + msg.timestampType + ": " + msg.timestamp + " isvalid: " + msg.isValid +
               " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
               " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
         if(msg.hasKey)
@@ -307,4 +369,60 @@ object DumpLogSegments {
     }
   }
 
+  class TimeIndexDumpErrors {
+    val misMatchesForTimeIndexFilesMap = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
+    val outOfOrderTimestamp = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
+    val shallowOffsetNotFound = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
+
+    def recordMismatchTimeIndex(file: File, indexTimestamp: Long, logTimestamp: Long) {
+      var misMatchesSeq = misMatchesForTimeIndexFilesMap.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+      if (misMatchesSeq.isEmpty)
+        misMatchesForTimeIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq)
+      misMatchesSeq += ((indexTimestamp, logTimestamp))
+    }
+
+    def recordOutOfOrderIndexTimestamp(file: File, indexTimestamp: Long, prevIndexTimestamp: Long) {
+      var outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+      if (outOfOrderSeq.isEmpty)
+        outOfOrderTimestamp.put(file.getAbsolutePath, outOfOrderSeq)
+      outOfOrderSeq += ((indexTimestamp, prevIndexTimestamp))
+    }
+
+    def recordShallowOffsetNotFound(file: File, indexOffset: Long, logOffset: Long) {
+      var shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+      if (shallowOffsetNotFoundSeq.isEmpty)
+        shallowOffsetNotFound.put(file.getAbsolutePath, shallowOffsetNotFoundSeq)
+      shallowOffsetNotFoundSeq += ((indexOffset, logOffset))
+    }
+
+    def printErrors() {
+      misMatchesForTimeIndexFilesMap.foreach {
+        case (fileName, listOfMismatches) => {
+          System.err.println("Found timestamp mismatch in :" + fileName)
+          listOfMismatches.foreach(m => {
+            System.err.println("  Index timestamp: %d, log timestamp: %d".format(m._1, m._2))
+          })
+        }
+      }
+
+      outOfOrderTimestamp.foreach {
+        case (fileName, outOfOrderTimestamps) => {
+          System.err.println("Found out of order timestamp in :" + fileName)
+          outOfOrderTimestamps.foreach(m => {
+            System.err.println("  Index timestamp: %d, Previously indexed timestamp: %d".format(m._1, m._2))
+          })
+        }
+      }
+
+      shallowOffsetNotFound.foreach {
+        case (fileName, listOfShallowOffsetNotFound) => {
+          System.err.println("The following indexed offsets are not found in the log.")
+          listOfShallowOffsetNotFound.foreach(m => {
+            System.err.println("Indexed offset: %s, found log offset: %s".format(m._1, m._2))
+          })
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8212121..15920ad 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -233,7 +233,7 @@ class CleanerTest extends JUnitSuite {
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
     
     // check grouping by index size
-    val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes()).sum + 1
+    val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
     checkSegmentOrder(groups)
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
@@ -391,8 +391,9 @@ class CleanerTest extends JUnitSuite {
     for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
     }   
+    System.out.println("here")
     log = recoverAndCheck(config, cleanedKeys)
-    
+
     // add some more messages and clean the log again
     while(log.numberOfSegments < 10) {
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index a64454d..82496f2 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -111,21 +111,21 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
     var position = 0
     assertEquals("Should be able to find the first message by its offset", 
                  OffsetPosition(0L, position), 
-                 messageSet.searchFor(0, 0))
+                 messageSet.searchForOffset(0, 0))
     position += MessageSet.entrySize(messageSet.head.message)
     assertEquals("Should be able to find second message when starting from 0", 
                  OffsetPosition(1L, position), 
-                 messageSet.searchFor(1, 0))
+                 messageSet.searchForOffset(1, 0))
     assertEquals("Should be able to find second message starting from its offset", 
                  OffsetPosition(1L, position), 
-                 messageSet.searchFor(1, position))
+                 messageSet.searchForOffset(1, position))
     position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message)
     assertEquals("Should be able to find fourth message from a non-existant offset", 
                  OffsetPosition(50L, position), 
-                 messageSet.searchFor(3, position))
+                 messageSet.searchForOffset(3, position))
     assertEquals("Should be able to find fourth message by correct offset", 
                  OffsetPosition(50L, position), 
-                 messageSet.searchFor(50,  position))
+                 messageSet.searchForOffset(50,  position))
   }
   
   /**
@@ -134,7 +134,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testIteratorWithLimits() {
     val message = messageSet.toList(1)
-    val start = messageSet.searchFor(1, 0).position
+    val start = messageSet.searchForOffset(1, 0).position
     val size = message.message.size + 12
     val slice = messageSet.read(start, size)
     assertEquals(List(message), slice.toList)
@@ -148,7 +148,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testTruncate() {
     val message = messageSet.toList.head
-    val end = messageSet.searchFor(1, 0).position
+    val end = messageSet.searchForOffset(1, 0).position
     messageSet.truncateTo(end)
     assertEquals(List(message), messageSet.toList)
     assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
@@ -272,7 +272,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testFormatConversionWithPartialMessage() {
     val message = messageSet.toList(1)
-    val start = messageSet.searchFor(1, 0).position
+    val start = messageSet.searchForOffset(1, 0).position
     val size = message.message.size + 12
     val slice = messageSet.read(start, size - 1)
     val messageV0 = slice.toMessageFormat(Message.MagicValue_V0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7b52a09..dc4cc79 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -100,7 +100,7 @@ class LogManagerTest {
     time.sleep(maxLogAgeMs + 1)
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes)
 
     try {
@@ -146,7 +146,7 @@ class LogManagerTest {
     time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes)
     try {
       log.read(0, 1024)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index edbfd99..64140e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -26,19 +26,23 @@ import kafka.message._
 import kafka.utils.SystemTime
 
 import scala.collection._
+ import scala.collection.mutable.ListBuffer
 
 class LogSegmentTest {
   
   val segments = mutable.ArrayBuffer[LogSegment]()
   
   /* create a segment with the given base offset */
-  def createSegment(offset: Long): LogSegment = {
+  def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
     val msFile = TestUtils.tempFile()
     val ms = new FileMessageSet(msFile)
     val idxFile = TestUtils.tempFile()
+    val timeIdxFile = TestUtils.tempFile()
     idxFile.delete()
+    timeIdxFile.delete()
     val idx = new OffsetIndex(idxFile, offset, 1000)
-    val seg = new LogSegment(ms, idx, offset, 10, 0, SystemTime)
+    val timeIdx = new TimeIndex(timeIdxFile, offset, 1500)
+    val seg = new LogSegment(ms, idx, timeIdx, offset, indexIntervalBytes, 0, SystemTime)
     segments += seg
     seg
   }
@@ -47,7 +51,7 @@ class LogSegmentTest {
   def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
     new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, 
                              offsetCounter = new LongRef(offset),
-                             messages = messages.map(s => new Message(s.getBytes)):_*)
+                             messages = messages.map(s => new Message(s.getBytes, offset * 10, Message.MagicValue_V1)):_*)
   }
   
   @After
@@ -76,7 +80,7 @@ class LogSegmentTest {
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there", "little", "bee")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet
     assertEquals(ms.toList, read.toList)
   }
@@ -90,7 +94,7 @@ class LogSegmentTest {
     val baseOffset = 50
     val seg = createSegment(baseOffset)
     val ms = messages(baseOffset, "hello", "there", "beautiful")
-    seg.append(baseOffset, ms)
+    seg.append(baseOffset, Message.NoTimestamp, -1L, ms)
     def validate(offset: Long) = 
       assertEquals(ms.filter(_.offset == offset).toList, 
                    seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList)
@@ -106,7 +110,7 @@ class LogSegmentTest {
   def testReadAfterLast() {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should give null", read)
   }
@@ -119,9 +123,9 @@ class LogSegmentTest {
   def testReadFromGap() {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, ms2)
+    seg.append(60, Message.NoTimestamp, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.toList, read.messageSet.toList)
   }
@@ -136,9 +140,9 @@ class LogSegmentTest {
     var offset = 40
     for(i <- 0 until 30) {
       val ms1 = messages(offset, "hello")
-      seg.append(offset, ms1)
-      val ms2 = messages(offset+1, "hello")
-      seg.append(offset+1, ms2)
+      seg.append(offset, Message.NoTimestamp, -1L, ms1)
+      val ms2 = messages(offset + 1, "hello")
+      seg.append(offset + 1, Message.NoTimestamp, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
       assertEquals(List(ms1.head, ms2.head), read.messageSet.toList)
@@ -150,7 +154,25 @@ class LogSegmentTest {
       offset += 1
     }
   }
-  
+
+  @Test
+  def testReloadLargestTimestampAfterTruncation() {
+    val numMessages = 30
+    val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
+    var offset = 40
+    for (i <- 0 until numMessages) {
+      seg.append(offset, offset, offset, messages(offset, "hello"))
+      offset += 1
+    }
+    val expectedNumEntries = numMessages / 2 - 1
+    assertEquals(s"Should have $expectedNumEntries time indexes", expectedNumEntries, seg.timeIndex.entries)
+
+    seg.truncateTo(41)
+    assertEquals(s"Should have 0 time indexes", 0, seg.timeIndex.entries)
+    assertEquals(s"Largest timestamp should be 400", 400L, seg.largestTimestamp)
+
+  }
+
   /**
    * Test truncating the whole segment, and check that we can reappend with the original offset.
    */
@@ -158,12 +180,38 @@ class LogSegmentTest {
   def testTruncateFull() {
     // test the case where we fully truncate the log
     val seg = createSegment(40)
-    seg.append(40, messages(40, "hello", "there"))
+    seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
     seg.truncateTo(0)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
-    seg.append(40, messages(40, "hello", "there"))    
+    seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
   }
-  
+
+  /**
+   * Append messages with timestamp and search message by timestamp.
+   */
+  @Test
+  def testFindOffsetByTimestamp() {
+    val messageSize = messages(0, s"msg00").sizeInBytes
+    val seg = createSegment(40, messageSize * 2 - 1)
+    // Produce some messages
+    for (i <- 40 until 50)
+      seg.append(i, i * 10, i, messages(i, s"msg$i"))
+
+    assertEquals(490, seg.largestTimestamp)
+    // Search for an indexed timestamp
+    assertEquals(42, seg.findOffsetByTimestamp(420).get)
+    assertEquals(43, seg.findOffsetByTimestamp(421).get)
+    // Search for an un-indexed timestamp
+    assertEquals(43, seg.findOffsetByTimestamp(430).get)
+    assertEquals(44, seg.findOffsetByTimestamp(431).get)
+    // Search beyond the last timestamp
+    assertEquals(50, seg.findOffsetByTimestamp(491).get)
+    // Search before the first indexed timestamp
+    assertEquals(41, seg.findOffsetByTimestamp(401).get)
+    // Search before the first timestamp
+    assertEquals(40, seg.findOffsetByTimestamp(399).get)
+  }
+
   /**
    * Test that offsets are assigned sequentially and that the nextOffset variable is incremented
    */
@@ -171,7 +219,7 @@ class LogSegmentTest {
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
     assertEquals(40, seg.nextOffset)
-    seg.append(50, messages(50, "hello", "there", "you"))
+    seg.append(50, Message.NoTimestamp, -1L, messages(50, "hello", "there", "you"))
     assertEquals(53, seg.nextOffset())
   }
   
@@ -198,13 +246,31 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, messages(i, i.toString))
+      seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
     val indexFile = seg.index.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
       assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset)
   }
+
+  /**
+   * Create a segment with some data and an index. Then corrupt the index,
+   * and recover the segment, the entries should all be readable.
+   */
+  @Test
+  def testRecoveryFixesCorruptTimeIndex() {
+    val seg = createSegment(0)
+    for(i <- 0 until 100)
+      seg.append(i, i * 10, i, messages(i, i.toString))
+    val timeIndexFile = seg.timeIndex.file
+    TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
+    seg.recover(64*1024)
+    for(i <- 0 until 100) {
+      assertEquals(i, seg.findOffsetByTimestamp(i * 10).get)
+      assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get)
+    }
+  }
   
   /**
    * Randomly corrupt a log a number of times and attempt recovery.
@@ -215,10 +281,10 @@ class LogSegmentTest {
     for(iteration <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
-        seg.append(i, messages(i, i.toString))
+        seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
-      val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
+      val position = seg.log.searchForOffset(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
       seg.recover(64*1024)
       assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
@@ -227,7 +293,7 @@ class LogSegmentTest {
   }
 
   /* create a segment with   pre allocate */
-  def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): LogSegment = {
+  def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = {
     val tempDir = TestUtils.tempDir()
     val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
     segments += seg
@@ -239,9 +305,9 @@ class LogSegmentTest {
   def testCreateWithInitFileSizeAppendMessage() {
     val seg = createSegment(40, false, 512*1024*1024, true)
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, ms2)
+    seg.append(60, Message.NoTimestamp, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.toList, read.messageSet.toList)
   }
@@ -253,9 +319,9 @@ class LogSegmentTest {
     val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true)
 
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, ms2)
+    seg.append(60, Message.NoTimestamp, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.toList, read.messageSet.toList)
     val oldSize = seg.log.sizeInBytes()

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 33dd68e..2466ef2 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -35,7 +35,7 @@ class LogTest extends JUnitSuite {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val time = new MockTime(0)
+  val time = new MockTime(100)
   var config: KafkaConfig = null
   val logConfig = LogConfig()
 
@@ -88,6 +88,20 @@ class LogTest extends JUnitSuite {
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
     }
 
+    time.sleep(log.config.segmentMs + 1)
+    val setWithTimestamp =
+      TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1)
+    log.append(setWithTimestamp)
+    assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
+
+    time.sleep(log.config.segmentMs + 1)
+    log.append(set)
+    assertEquals("Log should not roll because the roll should depend on the index of the first time index entry.", 5, log.numberOfSegments)
+
+    time.sleep(log.config.segmentMs + 1)
+    log.append(set)
+    assertEquals("Log should roll because the time since the timestamp of first time index entry has expired.", 6, log.numberOfSegments)
+
     val numSegments = log.numberOfSegments
     time.sleep(log.config.segmentMs + 1)
     log.append(new ByteBufferMessageSet())
@@ -457,28 +471,64 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(messageSize),
+        timestamp = time.milliseconds + i * 10))
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
     val lastIndexOffset = log.activeSegment.index.lastOffset
     val numIndexEntries = log.activeSegment.index.entries
     val lastOffset = log.logEndOffset
+    // After segment is closed, the last entry in the time index should be (largest timestamp -> last offset).
+    val lastTimeIndexOffset = log.logEndOffset - 1
+    val lastTimeIndexTimestamp  = log.activeSegment.largestTimestamp
+    // Depending on when the last time index entry is inserted, an entry may or may not be inserted into the time index.
+    val numTimeIndexEntries = log.activeSegment.timeIndex.entries + {
+      if (log.activeSegment.timeIndex.lastEntry.offset == log.logEndOffset - 1) 0 else 1
+    }
     log.close()
 
+    def verifyRecoveredLog(log: Log) {
+      assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset)
+      assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+      assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+      assertEquals("Should have same last time index timestamp", lastTimeIndexTimestamp, log.activeSegment.timeIndex.lastEntry.timestamp)
+      assertEquals("Should have same last time index offset", lastTimeIndexOffset, log.activeSegment.timeIndex.lastEntry.offset)
+      assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
+    }
+
     log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
-    assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
-    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
-    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+    verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
     log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
-    assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
-    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
-    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+    verifyRecoveredLog(log)
     log.close()
   }
 
   /**
+   * Test building the time index on the follower by setting assignOffsets to false.
+   */
+  @Test
+  def testBuildTimeIndexWhenNotAssigningOffsets() {
+    val numMessages = 100
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 10000: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val config = LogConfig(logProps)
+    val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+
+    val messages = (0 until numMessages).map { i =>
+      new ByteBufferMessageSet(NoCompressionCodec, new LongRef(100 + i), new Message(i.toString.getBytes(), time.milliseconds + i, Message.MagicValue_V1))
+    }
+    messages.foreach(log.append(_, assignOffsets = false))
+    val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
+    assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries)
+    assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}",
+      time.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp)
+  }
+
+  /**
    * Test that if we manually delete an index segment it is rebuilt when the log is re-opened
    */
   @Test
@@ -492,19 +542,58 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
+    val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
     // delete all the index files
     indexFiles.foreach(_.delete())
+    timeIndexFiles.foreach(_.delete())
 
     // reopen the log
     log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
-    for(i <- 0 until numMessages)
+    assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
+    assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
+    for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
+      if (i == 0)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+      else
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+    }
+    log.close()
+  }
+
+  /**
+   * Test that if messages format version of the messages in a segment is before 0.10.0, the time index should be empty.
+   */
+  @Test
+  def testRebuildTimeIndexForOldMessages() {
+    val numMessages = 200
+    val segmentSize = 200
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+    logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
+
+    val config = LogConfig(logProps)
+    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    for(i <- 0 until numMessages)
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+    val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
+
+    // Delete the time index.
+    timeIndexFiles.foreach(_.delete())
+
+    // The rebuilt time index should be empty
+    log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time)
+    val segArray = log.logSegments.toArray
+    for (i <- 0 until segArray.size - 1)
+      assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
+
   }
 
   /**
@@ -521,8 +610,9 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
+    val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
     // corrupt all the index files
@@ -532,11 +622,23 @@ class LogTest extends JUnitSuite {
       bw.close()
     }
 
+    // corrupt all the index files
+    for( file <- timeIndexFiles) {
+      val bw = new BufferedWriter(new FileWriter(file))
+      bw.write("  ")
+      bw.close()
+    }
+
     // reopen the log
     log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
-    for(i <- 0 until numMessages)
+    for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
+      if (i == 0)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+      else
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+    }
     log.close()
   }
 
@@ -602,27 +704,37 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testIndexResizingAtTruncation() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
-    val setSize = set.sizeInBytes
+    val setSize = TestUtils.singleMessageSet(payload = "test".getBytes).sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, (setSize - 1): java.lang.Integer)
     val config = LogConfig(logProps)
     val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
     for (i<- 1 to msgPerSeg)
-      log.append(set)
+      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(set)
+      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
-    assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList.head.index.maxEntries)
+    val expectedEntries = msgPerSeg - 1
+
+    assertEquals(s"The index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.index.maxEntries)
+    assertEquals(s"The time index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.timeIndex.maxEntries)
+
     log.truncateTo(0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
     assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries)
+    assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries)
+
+    time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(set)
+      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
   }
 
@@ -632,7 +744,9 @@ class LogTest extends JUnitSuite {
   @Test
   def testBogusIndexSegmentsAreRemoved() {
     val bogusIndex1 = Log.indexFilename(logDir, 0)
+    val bogusTimeIndex1 = Log.timeIndexFilename(logDir, 0)
     val bogusIndex2 = Log.indexFilename(logDir, 5)
+    val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5)
 
     val set = TestUtils.singleMessageSet("test".getBytes)
     val logProps = new Properties()
@@ -646,7 +760,9 @@ class LogTest extends JUnitSuite {
                       time)
 
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
+    assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0)
     assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
+    assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists)
 
     // check that we can append to the log
     for(i <- 0 until 10)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
new file mode 100644
index 0000000..bc60c72
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+
+import kafka.common.InvalidOffsetException
+import kafka.utils.TestUtils
+import org.junit.{Test, After, Before}
+import org.junit.Assert.{assertEquals}
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Unit test for time index.
+ */
+class TimeIndexTest extends JUnitSuite {
+  var idx: TimeIndex = null
+  val maxEntries = 30
+  val baseOffset = 45L
+
+  @Before
+  def setup() {
+    this.idx = new TimeIndex(file = nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
+  }
+
+  @After
+  def teardown() {
+    if(this.idx != null)
+      this.idx.file.delete()
+  }
+
+  @Test
+  def testLookUp() {
+    // Empty time index
+    assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(100L))
+
+    // Add several time index entries.
+    appendEntries(maxEntries - 1)
+
+    // look for timestamp smaller than the earliest entry
+    assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(9))
+    // look for timestamp in the middle of two entries.
+    assertEquals(TimestampOffset(20L, 65L), idx.lookup(25))
+    // look for timestamp same as the one in the entry
+    assertEquals(TimestampOffset(30L, 75L), idx.lookup(30))
+  }
+
+  @Test
+  def testTruncate() {
+    appendEntries(maxEntries - 1)
+    idx.truncate()
+    assertEquals(0, idx.entries)
+
+    appendEntries(maxEntries - 1)
+    idx.truncateTo(10 + baseOffset)
+    assertEquals(0, idx.entries)
+  }
+
+  @Test
+  def testAppend() {
+    appendEntries(maxEntries - 1)
+    intercept[IllegalArgumentException] {
+      idx.maybeAppend(10000L, 1000L)
+    }
+    intercept[InvalidOffsetException] {
+      idx.maybeAppend(10000L, (maxEntries - 2) * 10, true)
+    }
+    idx.maybeAppend(10000L, 1000L, true)
+  }
+
+  private def appendEntries(numEntries: Int) {
+    for (i <- 1 to numEntries)
+      idx.maybeAppend(i * 10, i * 10 + baseOffset)
+  }
+
+  def nonExistantTempFile(): File = {
+    val file = TestUtils.tempFile()
+    file.delete()
+    file
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 4810009..39eb84c 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -152,56 +152,69 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
   @Test
   def testLogAppendTime() {
-    val startTime = System.currentTimeMillis()
+    val now = System.currentTimeMillis()
     // The timestamps should be overwritten
     val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = NoCompressionCodec)
     val compressedMessagesWithRecompresion = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
     val compressedMessagesWithoutRecompression =
-      getMessages(magicValue = Message.MagicValue_V1, timestamp = -1L, codec = DefaultCompressionCodec)
-
-    val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                           now = System.currentTimeMillis(),
-                                                                           sourceCodec = NoCompressionCodec,
-                                                                           targetCodec = NoCompressionCodec,
-                                                                           messageFormatVersion = 1,
-                                                                           messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                           messageTimestampDiffMaxMs = 1000L)
-
-    val (validatedCompressedMessages, _) =
+      getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = DefaultCompressionCodec)
+
+    val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                      now = now,
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageFormatVersion = 1,
+                                                                      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                                      messageTimestampDiffMaxMs = 1000L)
+    val validatedMessages = validatingResults.validatedMessages
+
+    val validatingCompressedMessagesResults =
       compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                          now = System.currentTimeMillis(),
+                                                                          now = now,
                                                                           sourceCodec = DefaultCompressionCodec,
                                                                           targetCodec = DefaultCompressionCodec,
                                                                           messageFormatVersion = 1,
                                                                           messageTimestampType = TimestampType.LOG_APPEND_TIME,
                                                                           messageTimestampDiffMaxMs = 1000L)
+    val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages
 
-    val (validatedCompressedMessagesWithoutRecompression, _) =
+    val validatingCompressedMessagesWithoutRecompressionResults =
       compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                              now = System.currentTimeMillis(),
+                                                                              now = now,
                                                                               sourceCodec = DefaultCompressionCodec,
                                                                               targetCodec = DefaultCompressionCodec,
                                                                               messageFormatVersion = 1,
                                                                               messageTimestampType = TimestampType.LOG_APPEND_TIME,
                                                                               messageTimestampDiffMaxMs = 1000L)
 
-    val now = System.currentTimeMillis()
+    val validatedCompressedMessagesWithoutRecompression = validatingCompressedMessagesWithoutRecompressionResults.validatedMessages
+
     assertEquals("message set size should not change", messages.size, validatedMessages.size)
     validatedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
+    assertEquals(s"Max timestamp should be $now", now, validatingResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be 0", 0, validatingResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
 
     assertEquals("message set size should not change", compressedMessagesWithRecompresion.size, validatedCompressedMessages.size)
     validatedCompressedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
     assertTrue("MessageSet should still valid", validatedCompressedMessages.shallowIterator.next().message.isValid)
+    assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithRecompresion.size - 1}",
+      compressedMessagesWithRecompresion.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp)
+    assertTrue("Message size may have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged)
 
     assertEquals("message set size should not change", compressedMessagesWithoutRecompression.size,
       validatedCompressedMessagesWithoutRecompression.size)
     validatedCompressedMessagesWithoutRecompression.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
     assertTrue("MessageSet should still valid", validatedCompressedMessagesWithoutRecompression.shallowIterator.next().message.isValid)
+    assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesWithoutRecompressionResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithoutRecompression.size - 1}",
+      compressedMessagesWithoutRecompression.size - 1, validatingCompressedMessagesWithoutRecompressionResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingCompressedMessagesWithoutRecompressionResults.messageSizeMaybeChanged)
 
     def validateLogAppendTime(message: Message) {
       message.ensureValid()
-      assertTrue(s"Timestamp of message $message should be between $startTime and $now",
-        message.timestamp >= startTime && message.timestamp <= now)
+      assertEquals(s"Timestamp of message $message should be $now", now, message.timestamp)
       assertEquals(TimestampType.LOG_APPEND_TIME, message.timestampType)
     }
   }
@@ -209,18 +222,28 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testCreateTime() {
     val now = System.currentTimeMillis()
-    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec)
-    val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec)
-
-    val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                           now = System.currentTimeMillis(),
-                                                                           sourceCodec = NoCompressionCodec,
-                                                                           targetCodec = NoCompressionCodec,
-                                                                           messageFormatVersion = 1,
-                                                                           messageTimestampType = TimestampType.CREATE_TIME,
-                                                                           messageTimestampDiffMaxMs = 1000L)
-
-    val (validatedCompressedMessages, _) =
+    val timestampSeq = Seq(now - 1, now + 1, now)
+    val messages =
+      new ByteBufferMessageSet(NoCompressionCodec,
+                               new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1),
+                               new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1),
+                               new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1))
+    val compressedMessages =
+      new ByteBufferMessageSet(DefaultCompressionCodec,
+                               new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1),
+                               new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1),
+                               new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1))
+
+    val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                      now = System.currentTimeMillis(),
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageFormatVersion = 1,
+                                                                      messageTimestampType = TimestampType.CREATE_TIME,
+                                                                      messageTimestampDiffMaxMs = 1000L)
+    val validatedMessages = validatingResults.validatedMessages
+
+    val validatingCompressedMessagesResults =
       compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
                                                           now = System.currentTimeMillis(),
                                                           sourceCodec = DefaultCompressionCodec,
@@ -228,17 +251,29 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                           messageFormatVersion = 1,
                                                           messageTimestampType = TimestampType.CREATE_TIME,
                                                           messageTimestampDiffMaxMs = 1000L)
+    val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages
 
+    var i = 0
     for (messageAndOffset <- validatedMessages) {
       messageAndOffset.message.ensureValid()
-      assertEquals(messageAndOffset.message.timestamp, now)
+      assertEquals(messageAndOffset.message.timestamp, timestampSeq(i))
       assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
+      i += 1
     }
+    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
+    i = 0
     for (messageAndOffset <- validatedCompressedMessages) {
       messageAndOffset.message.ensureValid()
-      assertEquals(messageAndOffset.message.timestamp, now)
+      assertEquals(messageAndOffset.message.timestamp, timestampSeq(i))
       assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
+      i += 1
     }
+    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedCompressedMessages.size - 1}",
+      validatedCompressedMessages.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged)
   }
 
   @Test
@@ -287,7 +322,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                            targetCodec = NoCompressionCodec,
                                                            messageFormatVersion = 0,
                                                            messageTimestampType = TimestampType.CREATE_TIME,
-                                                           messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                           messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
@@ -297,7 +332,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                                      targetCodec = DefaultCompressionCodec,
                                                                      messageFormatVersion = 0,
                                                                      messageTimestampType = TimestampType.CREATE_TIME,
-                                                                     messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                                     messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
   }
 
@@ -310,22 +345,22 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets
     checkOffsets(messages, 0)
     val offset = 1234567
-    val (messageWithOffset, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                           now = System.currentTimeMillis(),
-                                                                           sourceCodec = NoCompressionCodec,
-                                                                           targetCodec = NoCompressionCodec,
-                                                                           messageTimestampType = TimestampType.CREATE_TIME,
-                                                                           messageTimestampDiffMaxMs = 5000L)
+    val messageWithOffset = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                      now = System.currentTimeMillis(),
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageTimestampType = TimestampType.CREATE_TIME,
+                                                                      messageTimestampDiffMaxMs = 5000L).validatedMessages
     checkOffsets(messageWithOffset, offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    val (compressedMessagesWithOffset, _) = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                                                now = System.currentTimeMillis(),
-                                                                                                sourceCodec = DefaultCompressionCodec,
-                                                                                                targetCodec = DefaultCompressionCodec,
-                                                                                                messageTimestampType = TimestampType.CREATE_TIME,
-                                                                                                messageTimestampDiffMaxMs = 5000L)
+    val compressedMessagesWithOffset = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                                           now = System.currentTimeMillis(),
+                                                                                           sourceCodec = DefaultCompressionCodec,
+                                                                                           targetCodec = DefaultCompressionCodec,
+                                                                                           messageTimestampType = TimestampType.CREATE_TIME,
+                                                                                           messageTimestampDiffMaxMs = 5000L).validatedMessages
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -343,7 +378,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                              targetCodec = NoCompressionCodec,
                                                              messageFormatVersion = 1,
                                                              messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                             messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                             messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
     // check compressed messages
     checkOffsets(compressedMessagesV0, 0)
@@ -353,7 +388,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                                        targetCodec = DefaultCompressionCodec,
                                                                        messageFormatVersion = 1,
                                                                        messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                       messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                                       messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
     // Check down conversion
     val now = System.currentTimeMillis()
@@ -368,7 +403,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                              targetCodec = NoCompressionCodec,
                                                              messageFormatVersion = 0,
                                                              messageTimestampType = TimestampType.CREATE_TIME,
-                                                             messageTimestampDiffMaxMs = 5000L)._1, offset)
+                                                             messageTimestampDiffMaxMs = 5000L).validatedMessages, offset)
 
     // check compressed messages
     checkOffsets(compressedMessagesV1, 0)
@@ -378,7 +413,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                                        targetCodec = DefaultCompressionCodec,
                                                                        messageFormatVersion = 0,
                                                                        messageTimestampType = TimestampType.CREATE_TIME,
-                                                                       messageTimestampDiffMaxMs = 5000L)._1, offset)
+                                                                       messageTimestampDiffMaxMs = 5000L).validatedMessages, offset)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 05b84ef..131a24a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -274,8 +274,9 @@ object TestUtils extends Logging {
   def singleMessageSet(payload: Array[Byte],
                        codec: CompressionCodec = NoCompressionCodec,
                        key: Array[Byte] = null,
+                       timestamp: Long = Message.NoTimestamp,
                        magicValue: Byte = Message.CurrentMagicValue) =
-    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, Message.NoTimestamp, magicValue))
+    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, timestamp, magicValue))
 
   /**
    * Generate an array of random bytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index dfd20f4..eef21cf 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -15,10 +15,24 @@
  limitations under the License.
 -->
 
+
+
 <h3><a id="upgrade" href="#upgrade">1.5 Upgrading From Previous Versions</a></h3>
 
-<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
+<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.10.0.X to 0.10.1.0</a></h4>
+0.10.1.0 is compatible with 0.10.0.X in terms of wire protocol. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.
+However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.
 
+<h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5>
+<ul>
+    <li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li>
+    <li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp of the first message in a log segment. i.e. if the timestamp of the first message in the segment is T, the log will be rolled out at T + log.roll.ms </li>
+    <li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li>
+    <li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li>
+    <li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
+</ul>
+
+<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
 <ul>
     <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
 </ul>


Mime
View raw message