kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [04/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:43:57 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 2827a2f..75b1f24 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -54,7 +54,7 @@ class ConsumerFetcherThread(name: String,
     replicaId(Request.OrdinaryConsumerId).
     maxWait(config.fetchWaitMaxMs).
     minBytes(config.fetchMinBytes).
-    requestVersion(kafka.api.FetchRequest.CurrentVersion)
+    requestVersion(3) // for now, the old consumer is pinned to the old message format through the fetch request
 
   override def initiateShutdown(): Boolean = {
     val justShutdown = super.initiateShutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index a2fee6b..740fff4 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,7 +24,7 @@ import kafka.cluster.Broker
 import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
@@ -121,7 +121,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         config.requestTimeoutMs,
         time,
-        false
+        false,
+        new ApiVersions
       )
     }
     val threadName = threadNamePrefix match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 1735dc8..70f4724 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -423,10 +423,10 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def doCommitOffsets(group: GroupMetadata,
-                      memberId: String,
-                      generationId: Int,
-                      offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                      responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
+                              memberId: String,
+                              generationId: Int,
+                              offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                              responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
     var delayedOffsetStore: Option[DelayedStore] = None
 
     group synchronized {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 6c8f252..9d62924 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.collection.JavaConverters._
 import scala.collection._
+import scala.collection.mutable.ListBuffer
 
 class GroupMetadataManager(val brokerId: Int,
                            val interBrokerProtocolVersion: ApiVersion,
@@ -146,12 +147,19 @@ class GroupMetadataManager(val brokerId: Int,
         // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
         val timestampType = TimestampType.CREATE_TIME
         val timestamp = time.milliseconds()
-        val record = Record.create(magicValue, timestampType, timestamp,
-          GroupMetadataManager.groupMetadataKey(group.groupId),
-          GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion))
+        val key = GroupMetadataManager.groupMetadataKey(group.groupId)
+        val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion)
+
+        val records = {
+          val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
+            Seq(new SimpleRecord(timestamp, key, value)).asJava))
+          val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
+          builder.append(timestamp, key, value)
+          builder.build()
+        }
 
         val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
-        val groupMetadataRecords = Map(groupMetadataPartition -> MemoryRecords.withRecords(timestampType, compressionType, record))
+        val groupMetadataRecords = Map(groupMetadataPartition -> records)
         val generationId = group.generationId
 
         // set the callback function to insert the created group into cache after log append completed
@@ -235,34 +243,41 @@ class GroupMetadataManager(val brokerId: Int,
     }
 
     // construct the message set to append
-    getMagic(partitionFor(group.groupId)) match {
-      case Some(magicValue) =>
-        // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
-        val timestampType = TimestampType.CREATE_TIME
-        val timestamp = time.milliseconds()
-        val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
-          Record.create(magicValue, timestampType, timestamp,
-            GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition),
-            GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
-        }.toSeq
-
-        val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
-
-        val entries = Map(offsetTopicPartition -> MemoryRecords.withRecords(timestampType, compressionType, records:_*))
-
-        // set the callback function to insert offsets into cache after log append completed
-        def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
-          // the append response should only contain the topics partition
-          if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
-            throw new IllegalStateException("Append status %s should only have one partition %s"
-              .format(responseStatus, offsetTopicPartition))
-
-          // construct the commit response status and insert
-          // the offset and metadata to cache if the append status has no error
-          val status = responseStatus(offsetTopicPartition)
+    if (filteredOffsetMetadata.isEmpty) {
+      // compute the final error codes for the commit response
+      val commitStatus = offsetMetadata.mapValues(_ => Errors.OFFSET_METADATA_TOO_LARGE)
+      responseCallback(commitStatus)
+      None
+    } else {
+      getMagic(partitionFor(group.groupId)) match {
+        case Some(magicValue) =>
+          // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+          val timestampType = TimestampType.CREATE_TIME
+          val timestamp = time.milliseconds()
 
-          val response =
-            group synchronized {
+          val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+            val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
+            val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+            new SimpleRecord(timestamp, key, value)
+          }
+          val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+          val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
+          val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
+          records.foreach(builder.append)
+          val entries = Map(offsetTopicPartition -> builder.build())
+
+          // set the callback function to insert offsets into cache after log append completed
+          def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+            // the append response should only contain the topics partition
+            if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
+              throw new IllegalStateException("Append status %s should only have one partition %s"
+                .format(responseStatus, offsetTopicPartition))
+
+            // construct the commit response status and insert
+            // the offset and metadata to cache if the append status has no error
+            val status = responseStatus(offsetTopicPartition)
+
+            val responseError = group synchronized {
               if (status.error == Errors.NONE) {
                 if (!group.is(Dead)) {
                   filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
@@ -281,7 +296,7 @@ class GroupMetadataManager(val brokerId: Int,
                   s"with generation $generationId failed when appending to log due to ${status.error.exceptionName}")
 
                 // transform the log append error code to the corresponding the commit status error code
-                val responseError = status.error match {
+                status.error match {
                   case Errors.UNKNOWN_TOPIC_OR_PARTITION
                        | Errors.NOT_ENOUGH_REPLICAS
                        | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
@@ -297,35 +312,34 @@ class GroupMetadataManager(val brokerId: Int,
 
                   case other => other
                 }
-
-                responseError
               }
             }
 
-          // compute the final error codes for the commit response
-          val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
-            if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-              (topicPartition, response)
-            else
-              (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-          }
+            // compute the final error codes for the commit response
+            val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+              if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+                (topicPartition, responseError)
+              else
+                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+            }
 
-          // finally trigger the callback logic passed from the API layer
-          responseCallback(commitStatus)
-        }
+            // finally trigger the callback logic passed from the API layer
+            responseCallback(commitStatus)
+          }
 
-        group synchronized {
-          group.prepareOffsetCommit(offsetMetadata)
-        }
+          group synchronized {
+            group.prepareOffsetCommit(offsetMetadata)
+          }
 
-        Some(DelayedStore(entries, putCacheCallback))
+          Some(DelayedStore(entries, putCacheCallback))
 
-      case None =>
-        val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
-          (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP)
-        }
-        responseCallback(commitStatus)
-        None
+        case None =>
+          val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
+            (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP)
+          }
+          responseCallback(commitStatus)
+          None
+      }
     }
   }
 
@@ -427,75 +441,76 @@ class GroupMetadataManager(val brokerId: Int,
             .records.asInstanceOf[FileRecords]
           val bufferRead = fileRecords.readInto(buffer, 0)
 
-          MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry =>
-            val record = entry.record
-            require(record.hasKey, "Group metadata/offset entry key should not be null")
-
-            GroupMetadataManager.readMessageKey(record.key) match {
-              case offsetKey: OffsetKey =>
-                // load offset
-                val key = offsetKey.key
-                if (record.hasNullValue) {
-                  loadedOffsets.remove(key)
-                  removedOffsets.add(key)
-                } else {
-                  val value = GroupMetadataManager.readOffsetMessageValue(record.value)
-                  loadedOffsets.put(key, value)
-                  removedOffsets.remove(key)
-                }
+          MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch =>
+            for (record <- batch.asScala) {
+              require(record.hasKey, "Group metadata/offset entry key should not be null")
+              GroupMetadataManager.readMessageKey(record.key) match {
+
+                case offsetKey: OffsetKey =>
+                  // load offset
+                  val key = offsetKey.key
+                  if (!record.hasValue) {
+                    loadedOffsets.remove(key)
+                    removedOffsets.add(key)
+                  } else {
+                    val value = GroupMetadataManager.readOffsetMessageValue(record.value)
+                    loadedOffsets.put(key, value)
+                    removedOffsets.remove(key)
+                  }
 
-              case groupMetadataKey: GroupMetadataKey =>
-                // load group metadata
-                val groupId = groupMetadataKey.key
-                val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
-                if (groupMetadata != null) {
-                  trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
-                  removedGroups.remove(groupId)
-                  loadedGroups.put(groupId, groupMetadata)
-                } else {
-                  loadedGroups.remove(groupId)
-                  removedGroups.add(groupId)
-                }
+                case groupMetadataKey: GroupMetadataKey =>
+                  // load group metadata
+                  val groupId = groupMetadataKey.key
+                  val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+                  if (groupMetadata != null) {
+                    trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
+                    removedGroups.remove(groupId)
+                    loadedGroups.put(groupId, groupMetadata)
+                  } else {
+                    loadedGroups.remove(groupId)
+                    removedGroups.add(groupId)
+                  }
 
-              case unknownKey =>
-                throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
-            }
+                case unknownKey =>
+                  throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+              }
 
-            currOffset = entry.nextOffset
+              currOffset = batch.nextOffset
+            }
           }
-        }
 
-        val (groupOffsets, emptyGroupOffsets) = loadedOffsets
-          .groupBy(_._1.group)
-          .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)} )
-          .partition { case (group, _) => loadedGroups.contains(group) }
+          val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+            .groupBy(_._1.group)
+            .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
+            .partition { case (group, _) => loadedGroups.contains(group) }
 
-        loadedGroups.values.foreach { group =>
-          val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
-          loadGroup(group, offsets)
-          onGroupLoaded(group)
-        }
+          loadedGroups.values.foreach { group =>
+            val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
+            loadGroup(group, offsets)
+            onGroupLoaded(group)
+          }
 
-        // load groups which store offsets in kafka, but which have no active members and thus no group
-        // metadata stored in the log
-        emptyGroupOffsets.foreach { case (groupId, offsets) =>
-          val group = new GroupMetadata(groupId)
-          loadGroup(group, offsets)
-          onGroupLoaded(group)
-        }
+          // load groups which store offsets in kafka, but which have no active members and thus no group
+          // metadata stored in the log
+          emptyGroupOffsets.foreach { case (groupId, offsets) =>
+            val group = new GroupMetadata(groupId)
+            loadGroup(group, offsets)
+            onGroupLoaded(group)
+          }
 
-        removedGroups.foreach { groupId =>
-          // if the cache already contains a group which should be removed, raise an error. Note that it
-          // is possible (however unlikely) for a consumer group to be removed, and then to be used only for
-          // offset storage (i.e. by "simple" consumers)
-          if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
-            throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
-              s"loading partition $topicPartition")
-        }
+          removedGroups.foreach { groupId =>
+            // if the cache already contains a group which should be removed, raise an error. Note that it
+            // is possible (however unlikely) for a consumer group to be removed, and then to be used only for
+            // offset storage (i.e. by "simple" consumers)
+            if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
+              throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
+                s"loading partition $topicPartition")
+          }
 
-        if (!shuttingDown.get())
-          info("Finished loading offsets from %s in %d milliseconds."
-            .format(topicPartition, time.milliseconds() - startMs))
+          if (!shuttingDown.get())
+            info("Finished loading offsets from %s in %d milliseconds."
+              .format(topicPartition, time.milliseconds() - startMs))
+        }
     }
   }
 
@@ -590,11 +605,12 @@ class GroupMetadataManager(val brokerId: Int,
 
           val partitionOpt = replicaManager.getPartition(appendPartition)
           partitionOpt.foreach { partition =>
-            val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) =>
+            val tombstones = ListBuffer.empty[SimpleRecord]
+            removedOffsets.foreach { case (topicPartition, offsetAndMetadata) =>
               trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
               val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
-              Record.create(magicValue, timestampType, timestamp, commitKey, null)
-            }.toBuffer
+              tombstones += new SimpleRecord(timestamp, commitKey, null)
+            }
             trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
 
             // We avoid writing the tombstone when the generationId is 0, since this group is only using
@@ -603,7 +619,8 @@ class GroupMetadataManager(val brokerId: Int,
               // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
               // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
               // retry removing this group.
-              tombstones += Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.groupMetadataKey(group.groupId), null)
+              val groupMetadataKey = GroupMetadataManager.groupMetadataKey(group.groupId)
+              tombstones += new SimpleRecord(timestamp, groupMetadataKey, null)
               trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
             }
 
@@ -611,13 +628,16 @@ class GroupMetadataManager(val brokerId: Int,
               try {
                 // do not need to require acks since even if the tombstone is lost,
                 // it will be appended again in the next purge cycle
-                partition.appendRecordsToLeader(MemoryRecords.withRecords(timestampType, compressionType, tombstones: _*))
+                val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*)
+                partition.appendRecordsToLeader(records)
                 offsetsRemoved += removedOffsets.size
-                trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted offsets and/or metadata for group $groupId")
+                trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
+                  s"offsets and/or metadata for group $groupId")
               } catch {
                 case t: Throwable =>
-                  error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted offsets and/or metadata for group $groupId.", t)
-                  // ignore and continue
+                  error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
+                    s"offsets and/or metadata for group $groupId.", t)
+                // ignore and continue
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 417122c..d2cac23 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -39,7 +39,7 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod
 import org.apache.kafka.common.TopicPartition
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Record.NO_TIMESTAMP, -1L, Record.NO_TIMESTAMP,
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
     NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
@@ -332,7 +332,7 @@ class Log(@volatile var dir: File,
    * Close this log
    */
   def close() {
-    debug("Closing log " + name)
+    debug(s"Closing log $name")
     lock synchronized {
       logSegments.foreach(_.close())
     }
@@ -391,27 +391,26 @@ class Log(@volatile var dir: File,
           // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
           // format conversion)
           if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
-            for (logEntry <- validRecords.shallowEntries.asScala) {
-              if (logEntry.sizeInBytes > config.maxMessageSize) {
+            for (batch <- validRecords.batches.asScala) {
+              if (batch.sizeInBytes > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
                 // to be consistent with pre-compression bytesRejectedRate recording
                 BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                 BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-                throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
-                  .format(logEntry.sizeInBytes, config.maxMessageSize))
+                throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
+                  .format(batch.sizeInBytes, config.maxMessageSize))
               }
             }
           }
-
         } else {
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
+            throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
         }
 
         // check messages set size may be exceed config.segmentSize
         if (validRecords.sizeInBytes > config.segmentSize) {
-          throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
+          throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."
             .format(validRecords.sizeInBytes, config.segmentSize))
         }
 
@@ -468,39 +467,42 @@ class Log(@volatile var dir: File,
     var lastOffset = -1L
     var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
-    var maxTimestamp = Record.NO_TIMESTAMP
+    var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
-    for (entry <- records.shallowEntries.asScala) {
-      // update the first offset if on the first message
-      if(firstOffset < 0)
-        firstOffset = entry.offset
+
+    for (batch <- records.batches.asScala) {
+      // update the first offset if on the first message. For magic versions older than 2, we use the last offset
+      // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
+      // For magic version 2, we can get the first offset directly from the batch header.
+      if (firstOffset < 0)
+        firstOffset = if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) batch.baseOffset else batch.lastOffset
+
       // check that offsets are monotonically increasing
-      if(lastOffset >= entry.offset)
+      if (lastOffset >= batch.lastOffset)
         monotonic = false
       // update the last offset seen
-      lastOffset = entry.offset
-
-      val record = entry.record
+      lastOffset = batch.lastOffset
 
       // Check if the message sizes are valid.
-      val messageSize = entry.sizeInBytes
-      if(messageSize > config.maxMessageSize) {
+      val batchSize = batch.sizeInBytes
+      if (batchSize > config.maxMessageSize) {
         BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
         BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-        throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
-          .format(messageSize, config.maxMessageSize))
+        throw new RecordTooLargeException(s"Message size is $batchSize bytes which exceeds the maximum configured " +
+          s"message size of ${config.maxMessageSize}.")
       }
 
       // check the validity of the message by checking CRC
-      record.ensureValid()
-      if (record.timestamp > maxTimestamp) {
-        maxTimestamp = record.timestamp
+      batch.ensureValid()
+
+      if (batch.maxTimestamp > maxTimestamp) {
+        maxTimestamp = batch.maxTimestamp
         offsetOfMaxTimestamp = lastOffset
       }
       shallowMessageCount += 1
-      validBytesCount += messageSize
+      validBytesCount += batchSize
 
-      val messageCodec = CompressionCodec.getCompressionCodec(record.compressionType.id)
+      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
       if (messageCodec != NoCompressionCodec)
         sourceCodec = messageCodec
     }
@@ -508,7 +510,7 @@ class Log(@volatile var dir: File,
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
 
-    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Record.NO_TIMESTAMP, sourceCodec,
+    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec,
       targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
@@ -624,9 +626,9 @@ class Log(@volatile var dir: File,
     val segmentsCopy = logSegments.toBuffer
     // For the earliest and latest, we do not need to return the timestamp.
     if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
+        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
     else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
-        return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset))
+        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
 
     val targetSeg = {
       // Get all the segments whose largest timestamp is smaller than target timestamp
@@ -1152,4 +1154,3 @@ object Log {
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5f06a73..8ddeca9 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -26,10 +26,10 @@ import com.yammer.metrics.core.Gauge
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.apache.kafka.common.record.{FileRecords, LogEntry, MemoryRecords}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
 import org.apache.kafka.common.utils.Time
-import MemoryRecords.LogEntryFilter
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 
 import scala.collection._
 import JavaConverters._
@@ -450,9 +450,8 @@ private[log] class Cleaner(val id: Int,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
                              stats: CleanerStats) {
-
-    val logCleanerFilter = new LogEntryFilter {
-      def shouldRetain(logEntry: LogEntry): Boolean = shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
+    val logCleanerFilter = new RecordFilter {
+      def shouldRetain(record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats)
     }
 
     var position = 0
@@ -475,7 +474,7 @@ private[log] class Cleaner(val id: Int,
       if (writeBuffer.position > 0) {
         writeBuffer.flip()
         val retained = MemoryRecords.readableRecords(writeBuffer)
-        dest.append(firstOffset = retained.deepEntries.iterator.next().offset,
+        dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
           largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
@@ -493,22 +492,24 @@ private[log] class Cleaner(val id: Int,
   private def shouldRetainMessage(source: kafka.log.LogSegment,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
-                                  entry: LogEntry,
+                                  record: Record,
                                   stats: CleanerStats): Boolean = {
-    val pastLatestOffset = entry.offset > map.latestOffset
-    if (pastLatestOffset)
+    if (record.isControlRecord)
       return true
 
+    val pastLatestOffset = record.offset > map.latestOffset
+    if (pastLatestOffset)
+      return true
 
-    if (entry.record.hasKey) {
-      val key = entry.record.key
+    if (record.hasKey) {
+      val key = record.key
       val foundOffset = map.get(key)
       /* two cases in which we can get rid of a message:
        *   1) if there exists a message with the same key but higher offset
        *   2) if the message is a delete "tombstone" marker and enough time has passed
        */
-      val redundant = foundOffset >= 0 && entry.offset < foundOffset
-      val obsoleteDelete = !retainDeletes && entry.record.hasNullValue
+      val redundant = foundOffset >= 0 && record.offset < foundOffset
+      val obsoleteDelete = !retainDeletes && !record.hasValue
       !redundant && !obsoleteDelete
     } else {
       stats.invalidMessage()
@@ -630,11 +631,10 @@ private[log] class Cleaner(val id: Int,
       throttler.maybeThrottle(records.sizeInBytes)
 
       val startPosition = position
-      for (entry <- records.deepEntries.asScala) {
-        val message = entry.record
-        if (message.hasKey && entry.offset >= start) {
+      for (record <- records.records.asScala) {
+        if (!record.isControlRecord && record.hasKey && record.offset >= start) {
           if (map.size < maxDesiredMapSize)
-            map.put(message.key, entry.offset)
+            map.put(record.key, record.offset)
           else
             return true
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 15fa29a..9263515 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -223,26 +223,25 @@ class LogSegment(val log: FileRecords,
     timeIndex.resize(timeIndex.maxIndexSize)
     var validBytes = 0
     var lastIndexEntry = 0
-    maxTimestampSoFar = Record.NO_TIMESTAMP
+    maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
     try {
-      for (entry <- log.shallowEntries(maxMessageSize).asScala) {
-        val record = entry.record
-        record.ensureValid()
-
-        // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
-        if (record.timestamp > maxTimestampSoFar) {
-          maxTimestampSoFar = record.timestamp
-          offsetOfMaxTimestamp = entry.offset
+      for (batch <- log.batches(maxMessageSize).asScala) {
+        batch.ensureValid()
+
+        // The max timestamp is exposed at the batch level, so no need to iterate the records
+        if (batch.maxTimestamp > maxTimestampSoFar) {
+          maxTimestampSoFar = batch.maxTimestamp
+          offsetOfMaxTimestamp = batch.lastOffset
         }
 
         // Build offset index
         if(validBytes - lastIndexEntry > indexIntervalBytes) {
-          val startOffset = entry.firstOffset
+          val startOffset = batch.baseOffset
           index.append(startOffset, validBytes)
           timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
           lastIndexEntry = validBytes
         }
-        validBytes += entry.sizeInBytes()
+        validBytes += batch.sizeInBytes()
       }
     } catch {
       case e: CorruptRecordException =>
@@ -294,7 +293,7 @@ class LogSegment(val log: FileRecords,
     // after truncation, reset and allocate more space for the (new currently  active) index
     index.resize(index.maxIndexSize)
     timeIndex.resize(timeIndex.maxIndexSize)
-    val bytesTruncated = log.truncateTo(mapping.position.toInt)
+    val bytesTruncated = log.truncateTo(mapping.position)
     if(log.sizeInBytes == 0) {
       created = time.milliseconds
       rollingBasedTimestamp = None
@@ -316,7 +315,7 @@ class LogSegment(val log: FileRecords,
     if (ms == null) {
       baseOffset
     } else {
-      ms.records.shallowEntries.asScala.lastOption match {
+      ms.records.batches.asScala.lastOption match {
         case None => baseOffset
         case Some(last) => last.nextOffset
       }
@@ -367,19 +366,19 @@ class LogSegment(val log: FileRecords,
 
   /**
    * The time this segment has waited to be rolled.
-   * If the first message has a timestamp we use the message timestamp to determine when to roll a segment. A segment
-   * is rolled if the difference between the new message's timestamp and the first message's timestamp exceeds the
+   * If the first message batch has a timestamp we use its timestamp to determine when to roll a segment. A segment
+   * is rolled if the difference between the new batch's timestamp and the first batch's timestamp exceeds the
    * segment rolling time.
-   * If the first message does not have a timestamp, we use the wall clock time to determine when to roll a segment. A
+   * If the first batch does not have a timestamp, we use the wall clock time to determine when to roll a segment. A
    * segment is rolled if the difference between the current wall clock time and the segment create time exceeds the
    * segment rolling time.
    */
   def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
     if (rollingBasedTimestamp.isEmpty) {
-      val iter = log.shallowEntries.iterator()
+      val iter = log.batches.iterator()
       if (iter.hasNext)
-        rollingBasedTimestamp = Some(iter.next().record.timestamp)
+        rollingBasedTimestamp = Some(iter.next().maxTimestamp)
     }
     rollingBasedTimestamp match {
       case Some(t) if t >= 0 => messageTimestamp - t

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 26d6e8c..94e3608 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,27 +20,25 @@ import java.nio.ByteBuffer
 
 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import kafka.utils.Logging
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 
-private[kafka] object LogValidator {
+private[kafka] object LogValidator extends Logging {
 
   /**
    * Update the offsets for this message set and do further validation on messages including:
    * 1. Messages for compacted topics must have keys
-   * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
+   * 2. When magic value >= 1, inner messages of a compressed message set must have monotonically increasing offsets
    *    starting from 0.
-   * 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
+   * 3. When magic value >= 1, validate and maybe overwrite timestamps of messages.
    *
-   * This method will convert the messages in the following scenarios:
-   * A. Magic value of a message = 0 and messageFormatVersion is 1
-   * B. Magic value of a message = 1 and messageFormatVersion is 0
-   *
-   * If no format conversion or value overwriting is required for messages, this method will perform in-place
-   * operations and avoid re-compression.
+   * This method will convert messages as necessary to the topic's configured message format version. If no format
+   * conversion or value overwriting is required for messages, this method will perform in-place operations to
+   * avoid expensive re-compression.
    *
    * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
    * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
@@ -51,12 +49,12 @@ private[kafka] object LogValidator {
                                                       sourceCodec: CompressionCodec,
                                                       targetCodec: CompressionCodec,
                                                       compactedTopic: Boolean = false,
-                                                      messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
+                                                      messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
                                                       messageTimestampType: TimestampType,
                                                       messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
-      if (!records.hasMatchingShallowMagic(messageFormatVersion))
+      if (!records.hasMatchingMagic(messageFormatVersion))
         convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType,
           messageTimestampDiffMaxMs, messageFormatVersion)
       else
@@ -76,19 +74,27 @@ private[kafka] object LogValidator {
                                                    timestampType: TimestampType,
                                                    messageTimestampDiffMaxMs: Long,
                                                    toMagicValue: Byte): ValidationAndOffsetAssignResult = {
-    val sizeInBytesAfterConversion = records.shallowEntries.asScala.map { logEntry =>
-      logEntry.record.convertedSize(toMagicValue)
-    }.sum
+    val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
+      CompressionType.NONE, records.records)
+
+    val (pid, epoch, sequence) = {
+      val first = records.batches.asScala.head
+      (first.producerId, first.producerEpoch, first.baseSequence)
+    }
 
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
-      offsetCounter.value, now)
+      offsetCounter.value, now, pid, epoch, sequence)
 
-    records.shallowEntries.asScala.foreach { logEntry =>
-      val record = logEntry.record
-      validateKey(record, compactedTopic)
-      validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs)
-      builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), record)
+    for (batch <- records.batches.asScala) {
+      ensureNonTransactional(batch)
+
+      for (record <- batch.asScala) {
+        ensureNotControlRecord(record)
+        validateKey(record, compactedTopic)
+        validateTimestamp(batch, record, now, timestampType, messageTimestampDiffMaxMs)
+        builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+      }
     }
 
     val convertedRecords = builder.build()
@@ -102,36 +108,44 @@ private[kafka] object LogValidator {
 
   private def assignOffsetsNonCompressed(records: MemoryRecords,
                                          offsetCounter: LongRef,
-                                         now: Long,
+                                         currentTimestamp: Long,
                                          compactedTopic: Boolean,
                                          timestampType: TimestampType,
                                          timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
-    var maxTimestamp = Record.NO_TIMESTAMP
+    var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
-    val firstOffset = offsetCounter.value
+    val initialOffset = offsetCounter.value
 
-    for (entry <- records.shallowEntries.asScala) {
-      val record = entry.record
-      validateKey(record, compactedTopic)
+    for (batch <- records.batches.asScala) {
+      ensureNonTransactional(batch)
 
-      val offset = offsetCounter.getAndIncrement()
-      entry.setOffset(offset)
+      for (record <- batch.asScala) {
+        record.ensureValid()
+        ensureNotControlRecord(record)
+        validateKey(record, compactedTopic)
 
-      if (record.magic > Record.MAGIC_VALUE_V0) {
-        validateTimestamp(record, now, timestampType, timestampDiffMaxMs)
+        val offset = offsetCounter.getAndIncrement()
+        if (batch.magic > RecordBatch.MAGIC_VALUE_V0) {
+          validateTimestamp(batch, record, currentTimestamp, timestampType, timestampDiffMaxMs)
 
-        if (timestampType == TimestampType.LOG_APPEND_TIME)
-          entry.setLogAppendTime(now)
-        else if (record.timestamp > maxTimestamp) {
-          maxTimestamp = record.timestamp
-          offsetOfMaxTimestamp = offset
+          if (record.timestamp > maxTimestamp) {
+            maxTimestamp = record.timestamp
+            offsetOfMaxTimestamp = offset
+          }
         }
       }
+
+      batch.setLastOffset(offsetCounter.value - 1)
+
+      // TODO: in the compressed path, we ensure that the batch max timestamp is correct.
+      //       We should either do the same or (better) let those two paths converge.
+      if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.LOG_APPEND_TIME)
+        batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, currentTimestamp)
     }
 
     if (timestampType == TimestampType.LOG_APPEND_TIME) {
-      maxTimestamp = now
-      offsetOfMaxTimestamp = firstOffset
+      maxTimestamp = currentTimestamp
+      offsetOfMaxTimestamp = initialOffset
     }
 
     ValidationAndOffsetAssignResult(
@@ -148,78 +162,110 @@ private[kafka] object LogValidator {
    * 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.
    * 4. Message format conversion is needed.
    */
-  private def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
-                                                         offsetCounter: LongRef,
-                                                         now: Long,
-                                                         sourceCodec: CompressionCodec,
-                                                         targetCodec: CompressionCodec,
-                                                         compactedTopic: Boolean = false,
-                                                         messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
-                                                         messageTimestampType: TimestampType,
-                                                         messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
-    // No in place assignment situation 1 and 2
-    var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0
-
-    var maxTimestamp = Record.NO_TIMESTAMP
-    val expectedInnerOffset = new LongRef(0)
-    val validatedRecords = new mutable.ArrayBuffer[Record]
-
-    records.deepEntries(true).asScala.foreach { logEntry =>
-      val record = logEntry.record
-      validateKey(record, compactedTopic)
-
-      if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) {
-        // Validate the timestamp
-        validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
-        // Check if we need to overwrite offset, no in place assignment situation 3
-        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
-          inPlaceAssignment = false
-        if (record.timestamp > maxTimestamp)
-          maxTimestamp = record.timestamp
+  def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
+                                                 offsetCounter: LongRef,
+                                                 currentTimestamp: Long,
+                                                 sourceCodec: CompressionCodec,
+                                                 targetCodec: CompressionCodec,
+                                                 compactedTopic: Boolean = false,
+                                                 messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
+                                                 messageTimestampType: TimestampType,
+                                                 messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+
+      // No in place assignment situation 1 and 2
+      var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0
+
+      var maxTimestamp = RecordBatch.NO_TIMESTAMP
+      val expectedInnerOffset = new LongRef(0)
+      val validatedRecords = new mutable.ArrayBuffer[Record]
+
+      for (batch <- records.batches.asScala) {
+        ensureNonTransactional(batch)
+
+        for (record <- batch.asScala) {
+          if (!record.hasMagic(batch.magic))
+            throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}")
+
+          record.ensureValid()
+          ensureNotControlRecord(record)
+          validateKey(record, compactedTopic)
+
+          if (!record.hasMagic(RecordBatch.MAGIC_VALUE_V0) && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0) {
+            // Validate the timestamp
+            validateTimestamp(batch, record, currentTimestamp, messageTimestampType, messageTimestampDiffMaxMs)
+            // Check if we need to overwrite offset
+            // No in place assignment situation 3
+            if (record.offset != expectedInnerOffset.getAndIncrement())
+              inPlaceAssignment = false
+            if (record.timestamp > maxTimestamp)
+              maxTimestamp = record.timestamp
+          }
+
+          if (sourceCodec != NoCompressionCodec && record.isCompressed)
+            throw new InvalidMessageException("Compressed outer record should not have an inner record with a " +
+              s"compression attribute set: $record")
+
+          // No in place assignment situation 4
+          if (!record.hasMagic(messageFormatVersion))
+            inPlaceAssignment = false
+
+          validatedRecords += record
+        }
       }
 
-      if (sourceCodec != NoCompressionCodec && logEntry.isCompressed)
-        throw new InvalidMessageException("Compressed outer record should not have an inner record with a " +
-          s"compression attribute set: $record")
+      if (!inPlaceAssignment) {
+        buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
+          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords)
+      } else {
+        // we can update the batch only and write the compressed payload as is
+        val batch = records.batches.iterator.next()
+        val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
 
-      // No in place assignment situation 4
-      if (record.magic != messageFormatVersion)
-        inPlaceAssignment = false
+        batch.setLastOffset(lastOffset)
 
-      validatedRecords += record.convert(messageFormatVersion, messageTimestampType)
-    }
+        if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
+          maxTimestamp = currentTimestamp
 
-    if (!inPlaceAssignment) {
-      val entries = validatedRecords.map(record => LogEntry.create(offsetCounter.getAndIncrement(), record))
-      val builder = MemoryRecords.builderWithEntries(messageTimestampType, CompressionType.forId(targetCodec.codec),
-        now, entries.asJava)
-      val updatedRecords = builder.build()
-      val info = builder.info
-      ValidationAndOffsetAssignResult(
-        validatedRecords = updatedRecords,
-        maxTimestamp = info.maxTimestamp,
-        shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
-        messageSizeMaybeChanged = true)
-    } else {
-      // ensure the inner messages are valid
-      validatedRecords.foreach(_.ensureValid)
-
-      // we can update the wrapper message only and write the compressed payload as is
-      val entry = records.shallowEntries.iterator.next()
-      val offset = offsetCounter.addAndGet(validatedRecords.size) - 1
-      entry.setOffset(offset)
-
-      val shallowTimestamp = if (messageTimestampType == TimestampType.LOG_APPEND_TIME) now else maxTimestamp
-      if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
-        entry.setLogAppendTime(shallowTimestamp)
-      else if (messageTimestampType == TimestampType.CREATE_TIME)
-        entry.setCreateTime(shallowTimestamp)
-
-      ValidationAndOffsetAssignResult(validatedRecords = records,
-        maxTimestamp = shallowTimestamp,
-        shallowOffsetOfMaxTimestamp = offset,
-        messageSizeMaybeChanged = false)
+        if (messageFormatVersion >= RecordBatch.MAGIC_VALUE_V1)
+          batch.setMaxTimestamp(messageTimestampType, maxTimestamp)
+
+        ValidationAndOffsetAssignResult(validatedRecords = records,
+          maxTimestamp = maxTimestamp,
+          shallowOffsetOfMaxTimestamp = lastOffset,
+          messageSizeMaybeChanged = false)
+      }
+  }
+
+  private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
+                                           compressionType: CompressionType, logAppendTime: Long,
+                                           validatedRecords: Seq[Record]): ValidationAndOffsetAssignResult = {
+    val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
+    val buffer = ByteBuffer.allocate(estimatedSize)
+    val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, logAppendTime)
+
+    validatedRecords.foreach { record =>
+      builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
     }
+
+    val records = builder.build()
+    val info = builder.info
+
+    ValidationAndOffsetAssignResult(
+      validatedRecords = records,
+      maxTimestamp = info.maxTimestamp,
+      shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
+      messageSizeMaybeChanged = true)
+  }
+
+  private def ensureNonTransactional(batch: RecordBatch) {
+    if (batch.isTransactional)
+      throw new InvalidRecordException("Transactional messages are not currently supported")
+  }
+
+  private def ensureNotControlRecord(record: Record) {
+    // Until we have implemented transaction support, we do not permit control records to be written
+    if (record.isControlRecord)
+      throw new InvalidRecordException("Control messages are not currently supported")
   }
 
   private def validateKey(record: Record, compactedTopic: Boolean) {
@@ -231,16 +277,17 @@ private[kafka] object LogValidator {
    * This method validates the timestamps of a message.
    * If the message is using create time, this method checks if it is within acceptable range.
    */
-  private def validateTimestamp(record: Record,
+  private def validateTimestamp(batch: RecordBatch,
+                                record: Record,
                                 now: Long,
                                 timestampType: TimestampType,
                                 timestampDiffMaxMs: Long) {
     if (timestampType == TimestampType.CREATE_TIME
-      && record.timestamp != Record.NO_TIMESTAMP
+      && record.timestamp != RecordBatch.NO_TIMESTAMP
       && math.abs(record.timestamp - now) > timestampDiffMaxMs)
       throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " +
         s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
-    if (record.timestampType == TimestampType.LOG_APPEND_TIME)
+    if (batch.timestampType == TimestampType.LOG_APPEND_TIME)
       throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
         s"timestamp type to LogAppendTime.")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 21c212f..5f2b387 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
 import kafka.common.InvalidOffsetException
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
-import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.RecordBatch
 
 /**
  * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
@@ -69,7 +69,7 @@ class TimeIndex(file: File,
   def lastEntry: TimestampOffset = {
     inLock(lock) {
       _entries match {
-        case 0 => TimestampOffset(Record.NO_TIMESTAMP, baseOffset)
+        case 0 => TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
         case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset]
       }
     }
@@ -145,7 +145,7 @@ class TimeIndex(file: File,
       val idx = mmap.duplicate
       val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
       if (slot == -1)
-        TimestampOffset(Record.NO_TIMESTAMP, baseOffset)
+        TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
       else {
         val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset]
         TimestampOffset(entry.timestamp, entry.offset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 198a4c3..e1d2882 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -29,20 +29,18 @@ object ByteBufferMessageSet {
 
   private def create(offsetAssigner: OffsetAssigner,
                      compressionCodec: CompressionCodec,
-                     wrapperMessageTimestamp: Option[Long],
                      timestampType: TimestampType,
                      messages: Message*): ByteBuffer = {
     if (messages.isEmpty)
       MessageSet.Empty.buffer
     else {
-      val magicAndTimestamp = wrapperMessageTimestamp match {
-        case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
-        case None => MessageSet.magicAndLargestTimestamp(messages)
-      }
-
-      val entries = messages.map(message => LogEntry.create(offsetAssigner.nextAbsoluteOffset(), message.asRecord))
-      val builder = MemoryRecords.builderWithEntries(timestampType, CompressionType.forId(compressionCodec.codec),
-        magicAndTimestamp.timestamp, entries.asJava)
+      val buffer = ByteBuffer.allocate(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
+      val builder = MemoryRecords.builder(buffer, messages.head.magic, CompressionType.forId(compressionCodec.codec),
+        timestampType, offsetAssigner.baseOffset)
+
+      for (message <- messages)
+        builder.appendWithOffset(offsetAssigner.nextAbsoluteOffset(), message.asRecord)
+
       builder.build().buffer
     }
   }
@@ -65,6 +63,8 @@ private class OffsetAssigner(offsets: Seq[Long]) {
     result
   }
 
+  def baseOffset = offsets.head
+
   def toInnerOffset(offset: Long): Long = offset - offsets.head
 
 }
@@ -130,20 +130,19 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
 
   private[kafka] def this(compressionCodec: CompressionCodec,
                           offsetCounter: LongRef,
-                          wrapperMessageTimestamp: Option[Long],
                           timestampType: TimestampType,
                           messages: Message*) {
     this(ByteBufferMessageSet.create(OffsetAssigner(offsetCounter, messages.size), compressionCodec,
-      wrapperMessageTimestamp, timestampType, messages:_*))
+      timestampType, messages:_*))
   }
 
   def this(compressionCodec: CompressionCodec, offsetCounter: LongRef, messages: Message*) {
-    this(compressionCodec, offsetCounter, None, TimestampType.CREATE_TIME, messages:_*)
+    this(compressionCodec, offsetCounter, TimestampType.CREATE_TIME, messages:_*)
   }
 
   def this(compressionCodec: CompressionCodec, offsetSeq: Seq[Long], messages: Message*) {
     this(ByteBufferMessageSet.create(new OffsetAssigner(offsetSeq), compressionCodec,
-      None, TimestampType.CREATE_TIME, messages:_*))
+      TimestampType.CREATE_TIME, messages:_*))
   }
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
@@ -166,11 +165,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
 
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
-    val entries = if (isShallow)
-      asRecords.shallowEntries
+    if (isShallow)
+      asRecords.batches.asScala.iterator.map(batch => MessageAndOffset.fromRecordBatch(batch.asInstanceOf[AbstractLegacyRecordBatch]))
     else
-      asRecords.deepEntries
-    entries.iterator.asScala.map(MessageAndOffset.fromLogEntry)
+      asRecords.records.asScala.iterator.map(record => MessageAndOffset.fromRecordBatch(record.asInstanceOf[AbstractLegacyRecordBatch]))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 474f934..3530929 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -19,12 +19,10 @@ package kafka.message
 
 import java.nio._
 
-import org.apache.kafka.common.record.{Record, TimestampType}
+import org.apache.kafka.common.record.{CompressionType, LegacyRecord, TimestampType}
 
 import scala.math._
-import kafka.utils._
-import org.apache.kafka.common.utils.ByteUtils.{readUnsignedInt, writeUnsignedInt}
-import org.apache.kafka.common.utils.{ByteUtils, Utils}
+import org.apache.kafka.common.utils.{ByteUtils, Crc32}
 
 /**
  * Constants related to messages
@@ -99,7 +97,7 @@ object Message {
     MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue)
 
 
-  def fromRecord(record: Record): Message = {
+  def fromRecord(record: LegacyRecord): Message = {
     val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp == null) None else Some(record.wrapperRecordTimestamp)
     val wrapperTimestampType = Option(record.wrapperRecordTimestampType)
     new Message(record.buffer, wrapperTimestamp, wrapperTimestampType)
@@ -140,9 +138,9 @@ class Message(val buffer: ByteBuffer,
   
   import kafka.message.Message._
 
-  private[message] def asRecord: Record = wrapperMessageTimestamp match {
-    case None => new Record(buffer)
-    case Some(timestamp) => new Record(buffer, timestamp, wrapperMessageTimestampType.orNull)
+  private[message] def asRecord: LegacyRecord = wrapperMessageTimestamp match {
+    case None => new LegacyRecord(buffer)
+    case Some(timestamp) => new LegacyRecord(buffer, timestamp, wrapperMessageTimestampType.orNull)
   }
 
   /**
@@ -179,10 +177,7 @@ class Message(val buffer: ByteBuffer,
     // skip crc, we will fill that in at the end
     buffer.position(MagicOffset)
     buffer.put(magicValue)
-    val attributes: Byte =
-      if (codec.codec > 0)
-        timestampType.updateAttributes((CompressionCodeMask & codec.codec).toByte)
-      else 0
+    val attributes: Byte = LegacyRecord.computeAttributes(magicValue, CompressionType.forId(codec.codec), timestampType)
     buffer.put(attributes)
     // Only put timestamp when "magic" value is greater than 0
     if (magic > MagicValue_V0)
@@ -204,7 +199,10 @@ class Message(val buffer: ByteBuffer,
     // now compute the checksum and fill it in
     ByteUtils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
   }
-  
+
+  def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, timestampType: TimestampType, codec: CompressionCodec, magicValue: Byte) =
+    this(bytes = bytes, key = key, timestamp = timestamp, timestampType = timestampType, codec = codec, payloadOffset = 0, payloadSize = -1, magicValue = magicValue)
+
   def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
     this(bytes = bytes, key = key, timestamp = timestamp, timestampType = TimestampType.CREATE_TIME, codec = codec, payloadOffset = 0, payloadSize = -1, magicValue = magicValue)
   
@@ -224,7 +222,7 @@ class Message(val buffer: ByteBuffer,
    * Compute the checksum of the message from the message contents
    */
   def computeChecksum: Long =
-    Utils.computeChecksum(buffer, MagicOffset, buffer.limit - MagicOffset)
+    Crc32.crc32(buffer, MagicOffset, buffer.limit - MagicOffset)
   
   /**
    * Retrieve the previously computed CRC for this message
@@ -315,13 +313,8 @@ class Message(val buffer: ByteBuffer,
   /**
    * The timestamp type of the message
    */
-  def timestampType = {
-    if (magic == MagicValue_V0)
-      TimestampType.NO_TIMESTAMP_TYPE
-    else
-      wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))
-  }
-  
+  def timestampType = LegacyRecord.timestampType(magic, wrapperMessageTimestampType.orNull, attributes)
+
   /**
    * The compression codec used with this message
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/message/MessageAndOffset.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala
index 46630c6..349e90b 100644
--- a/core/src/main/scala/kafka/message/MessageAndOffset.scala
+++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala
@@ -17,11 +17,11 @@
 
 package kafka.message
 
-import org.apache.kafka.common.record.LogEntry
+import org.apache.kafka.common.record.AbstractLegacyRecordBatch
 
 object MessageAndOffset {
-  def fromLogEntry(logEntry : LogEntry): MessageAndOffset = {
-    MessageAndOffset(Message.fromRecord(logEntry.record), logEntry.offset)
+  def fromRecordBatch(recordBatch: AbstractLegacyRecordBatch): MessageAndOffset = {
+    MessageAndOffset(Message.fromRecord(recordBatch.outerRecord), recordBatch.lastOffset)
   }
 }
 
@@ -32,13 +32,5 @@ case class MessageAndOffset(message: Message, offset: Long) {
    */
   def nextOffset: Long = offset + 1
 
-  /**
-   * We need to decompress the message, if required, to get the offset of the first uncompressed message.
-   */
-  def firstOffset: Long = toLogEntry.firstOffset
-
-  def toLogEntry: LogEntry = {
-    LogEntry.create(offset, message.asRecord)
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index 2fe54cd..915def0 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -93,7 +93,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
   override def toString: String = {
     val builder = new StringBuilder()
     builder.append(getClass.getSimpleName + "(")
-    val iter = this.asRecords.shallowEntries.iterator
+    val iter = this.asRecords.batches.iterator
     var i = 0
     while(iter.hasNext && i < 100) {
       val message = iter.next

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index d3b1a0a..7bfe76a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -46,7 +46,8 @@ object RequestChannel extends Logging {
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
   private def shutdownReceive: ByteBuffer = {
-    val emptyProduceRequest = new ProduceRequest.Builder(0, 0, Collections.emptyMap[TopicPartition, MemoryRecords]).build()
+    val emptyProduceRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 0, 0,
+      Collections.emptyMap[TopicPartition, MemoryRecords]).build()
     val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, emptyProduceRequest.version, "", 0)
     emptyProduceRequest.serialize(emptyRequestHeader)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 0eb3ad8..8842724 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -145,7 +145,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.NONE =>
                   try {
                     val records = partitionData.toRecords
-                    val newOffset = records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse(
+                    val newOffset = records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
                       currentPartitionFetchState.offset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d2f8dba..0798efd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -450,11 +450,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
-      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1, MemoryRecords.EMPTY))
+      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
     }
 
     val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
-      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1, MemoryRecords.EMPTY))
+      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+        FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
     }
 
     // the callback for sending a fetch response
@@ -462,22 +464,27 @@ class KafkaApis(val requestChannel: RequestChannel,
       val convertedPartitionData = {
         responsePartitionData.map { case (tp, data) =>
 
-          // We only do down-conversion when:
-          // 1. The message format version configured for the topic is using magic value > 0, and
-          // 2. The message set contains message whose magic > 0
-          // This is to reduce the message format conversion as much as possible. The conversion will only occur
-          // when new message format is used for the topic and we see an old request.
-          // Please note that if the message format is changed from a higher version back to lower version this
-          // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0
-          // without format down conversion.
-          val convertedData = if (versionId <= 1 && replicaManager.getMagic(tp).exists(_ > Record.MAGIC_VALUE_V0) &&
-            !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) {
-            trace(s"Down converting message to V0 for fetch request from $clientId")
-            val downConvertedRecords = data.records.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE)
-            FetchPartitionData(data.error, data.hw, downConvertedRecords)
-          } else data
-
-          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records)
+          // Down-conversion of the fetched records is needed when the stored magic version is
+          // greater than that supported by the client (as indicated by the fetch request version). If the
+          // configured magic version for the topic is less than or equal to that supported by the version of the
+          // fetch request, we skip the iteration through the records in order to check the magic version since we
+          // know it must be supported. However, if the magic version is changed from a higher version back to a
+          // lower version, this check will no longer be valid and we will fail to down-convert the messages
+          // which were written in the new format prior to the version downgrade.
+          val convertedData = replicaManager.getMagic(tp) match {
+            case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
+              trace(s"Down converting message to V0 for fetch request from $clientId")
+              FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+
+            case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
+              trace(s"Down converting message to V1 for fetch request from $clientId")
+              FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+
+            case _ => data
+          }
+
+          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LSO,
+            null, convertedData.records)
         }
       }
 
@@ -652,7 +659,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
           val found = {
             if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
-              TimestampOffset(Record.NO_TIMESTAMP, localReplica.highWatermark.messageOffset)
+              TimestampOffset(RecordBatch.NO_TIMESTAMP, localReplica.highWatermark.messageOffset)
             else {
               def allowed(timestampOffset: TimestampOffset): Boolean =
                 !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a15f034..879bc51 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.util
 import java.util.Properties
 
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
@@ -29,13 +28,12 @@ import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.ConfigDef.ValidList
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
-import org.apache.kafka.common.metrics.{MetricsReporter, Sensor}
+import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.server.policy.CreateTopicPolicy
 
-import scala.collection.{Map, immutable}
+import scala.collection.Map
 import scala.collection.JavaConverters._
 
 object Defaults {
@@ -382,7 +380,9 @@ object KafkaConfig {
   val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be generated." +
   "To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids " +
   "start from " + MaxReservedBrokerIdProp + " + 1."
-  val MessageMaxBytesDoc = "The maximum size of message that the server can receive"
+  val MessageMaxBytesDoc = "The maximum message size that the server can receive. Note that this limit also applies " +
+    "to the total size of a compressed batch of messages (when compression is enabled). Additionally, in versions " +
+    "0.11 and later, all messages are written as batches and this setting applies to the total size of the batch."
   val NumNetworkThreadsDoc = "the number of network threads that the server uses for handling network requests"
   val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network requests"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index fb8a48f..465b0b7 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
@@ -388,7 +388,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           config.requestTimeoutMs,
           time,
-          false)
+          false,
+          new ApiVersions)
       }
 
       var shutdownSucceeded: Boolean = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 3b6adec..29a2467 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -23,11 +23,11 @@ import java.util
 import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
-import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0}
+import kafka.api.{FetchRequest => _, _}
 import kafka.common.KafkaStorageException
 import ReplicaFetcherThread._
 import kafka.utils.Exit
-import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
@@ -60,7 +60,8 @@ class ReplicaFetcherThread(name: String,
   type PD = PartitionData
 
   private val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
     else 0
@@ -107,7 +108,8 @@ class ReplicaFetcherThread(name: String,
       brokerConfig.replicaSocketReceiveBufferBytes,
       brokerConfig.requestTimeoutMs,
       time,
-      false
+      false,
+      new ApiVersions
     )
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4ab8c2a..5ba093e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -339,7 +339,7 @@ class ReplicaManager(val config: KafkaConfig,
       // Just return an error and don't handle the request at all
       val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
         topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
-          LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
+          LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP)
       }
       responseCallback(responseStatus)
     }
@@ -384,6 +384,7 @@ class ReplicaManager(val config: KafkaConfig,
           val info = partitionOpt match {
             case Some(partition) =>
               partition.appendRecordsToLeader(records, requiredAcks)
+
             case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
               .format(topicPartition, localBrokerId))
           }


Mime
View raw message