kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/6] kafka git commit: KAFKA-5121; Implement transaction index for KIP-98
Date Sat, 06 May 2017 18:51:12 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/ProducerIdMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
deleted file mode 100644
index bcadce5..0000000
--- a/core/src/main/scala/kafka/log/ProducerIdMapping.scala
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log
-
-import java.io._
-import java.nio.ByteBuffer
-import java.nio.file.Files
-
-import kafka.common.KafkaException
-import kafka.utils.{Logging, nonthreadsafe}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException}
-import org.apache.kafka.common.protocol.types._
-import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
-
-import scala.collection.{immutable, mutable}
-
-private[log] object ProducerIdEntry {
-  val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-    -1, 0, RecordBatch.NO_TIMESTAMP)
-}
-
-private[log] case class ProducerIdEntry(epoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) {
-  def firstSeq: Int = lastSeq - offsetDelta
-  def firstOffset: Long = lastOffset - offsetDelta
-
-  def isDuplicate(batch: RecordBatch): Boolean = {
-    batch.producerEpoch == epoch &&
-      batch.baseSequence == firstSeq &&
-      batch.lastSequence == lastSeq
-  }
-}
-
-private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEntry) {
-  // the initialEntry here is the last successful appended batch. we validate incoming entries transitively, starting
-  // with the last appended entry.
-  private var epoch = initialEntry.epoch
-  private var firstSeq = initialEntry.firstSeq
-  private var lastSeq = initialEntry.lastSeq
-  private var lastOffset = initialEntry.lastOffset
-  private var maxTimestamp = initialEntry.timestamp
-
-  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = {
-    if (this.epoch > epoch) {
-      throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer with a newer epoch. $epoch (request epoch), ${this.epoch} (server epoch)")
-    } else if (this.epoch == RecordBatch.NO_PRODUCER_EPOCH || this.epoch < epoch) {
-      if (firstSeq != 0)
-        throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " +
-          s"(request epoch), $firstSeq (seq. number)")
-    } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
-      throw new DuplicateSequenceNumberException(s"Duplicate sequence number: $pid (pid), $firstSeq " +
-        s"(seq. number), ${this.firstSeq} (expected seq. number)")
-    } else if (firstSeq != this.lastSeq + 1L) {
-      throw new OutOfOrderSequenceException(s"Invalid sequence number: $pid (pid), $firstSeq " +
-        s"(seq. number), ${this.lastSeq} (expected seq. number)")
-    }
-  }
-
-  def assignLastOffsetAndTimestamp(lastOffset: Long, lastTimestamp: Long): Unit = {
-    this.lastOffset = lastOffset
-    this.maxTimestamp = lastTimestamp
-  }
-
-  private def append(epoch: Short, firstSeq: Int, lastSeq: Int, lastTimestamp: Long, lastOffset: Long) {
-    validateAppend(epoch, firstSeq, lastSeq)
-    this.epoch = epoch
-    this.firstSeq = firstSeq
-    this.lastSeq = lastSeq
-    this.maxTimestamp = lastTimestamp
-    this.lastOffset = lastOffset
-  }
-
-  def append(batch: RecordBatch): Unit =
-    append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset)
-
-  def append(entry: ProducerIdEntry): Unit =
-    append(entry.epoch, entry.firstSeq, entry.lastSeq, entry.timestamp, entry.lastOffset)
-
-  def lastEntry: ProducerIdEntry =
-    ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp)
-}
-
-class CorruptSnapshotException(msg: String) extends KafkaException(msg)
-
-object ProducerIdMapping {
-  private val PidSnapshotVersion: Short = 1
-  private val VersionField = "version"
-  private val CrcField = "crc"
-  private val PidField = "pid"
-  private val LastSequenceField = "last_sequence"
-  private val EpochField = "epoch"
-  private val LastOffsetField = "last_offset"
-  private val OffsetDeltaField = "offset_delta"
-  private val TimestampField = "timestamp"
-  private val PidEntriesField = "pid_entries"
-
-  private val VersionOffset = 0
-  private val CrcOffset = VersionOffset + 2
-  private val PidEntriesOffset = CrcOffset + 4
-
-  private val maxPidSnapshotsToRetain = 2
-
-  val PidSnapshotEntrySchema = new Schema(
-    new Field(PidField, Type.INT64, "The producer ID"),
-    new Field(EpochField, Type.INT16, "Current epoch of the producer"),
-    new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"),
-    new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"),
-    new Field(OffsetDeltaField, Type.INT32, "The difference of the last sequence and first sequence in the last written batch"),
-    new Field(TimestampField, Type.INT64, "Max timestamp from the last written entry"))
-  val PidSnapshotMapSchema = new Schema(
-    new Field(VersionField, Type.INT16, "Version of the snapshot file"),
-    new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
-    new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
-
-  def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = {
-    val buffer = Files.readAllBytes(file.toPath)
-    val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
-
-    val version = struct.getShort(VersionField)
-    if (version != PidSnapshotVersion)
-      throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
-
-    val crc = struct.getUnsignedInt(CrcField)
-    val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
-    if (crc != computedCrc)
-      throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). Stored crc: $crc. Computed crc: $computedCrc")
-
-    struct.getArray(PidEntriesField).map { pidEntryObj =>
-      val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
-      val pid: Long = pidEntryStruct.getLong(PidField)
-      val epoch = pidEntryStruct.getShort(EpochField)
-      val seq = pidEntryStruct.getInt(LastSequenceField)
-      val offset = pidEntryStruct.getLong(LastOffsetField)
-      val timestamp = pidEntryStruct.getLong(TimestampField)
-      val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField)
-      val newEntry = ProducerIdEntry(epoch, seq, offset, offsetDelta, timestamp)
-      pid -> newEntry
-    }
-  }
-
-  private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) {
-    val struct = new Struct(PidSnapshotMapSchema)
-    struct.set(VersionField, PidSnapshotVersion)
-    struct.set(CrcField, 0L) // we'll fill this after writing the entries
-    val entriesArray = entries.map {
-      case (pid, entry) =>
-        val pidEntryStruct = struct.instance(PidEntriesField)
-        pidEntryStruct.set(PidField, pid)
-          .set(EpochField, entry.epoch)
-          .set(LastSequenceField, entry.lastSeq)
-          .set(LastOffsetField, entry.lastOffset)
-          .set(OffsetDeltaField, entry.offsetDelta)
-          .set(TimestampField, entry.timestamp)
-        pidEntryStruct
-    }.toArray
-    struct.set(PidEntriesField, entriesArray)
-
-    val buffer = ByteBuffer.allocate(struct.sizeOf)
-    struct.writeTo(buffer)
-    buffer.flip()
-
-    // now fill in the CRC
-    val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset)
-    ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
-
-    val fos = new FileOutputStream(file)
-    try {
-      fos.write(buffer.array, buffer.arrayOffset, buffer.limit)
-    } finally {
-      fos.close()
-    }
-  }
-
-  private def isSnapshotFile(name: String): Boolean = name.endsWith(Log.PidSnapshotFileSuffix)
-
-}
-
-/**
- * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g.
- * epoch, sequence number, last offset, etc.)
- *
- * The sequence number is the last number successfully appended to the partition for the given identifier.
- * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message
- * appended to the partition.
- *
- * As long as a PID is contained in the map, the corresponding producer can continue to write data.
- * However, PIDs can be expired due to lack of recent use or if the last written entry has been deleted from
- * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure
- * that the most recent entry from a given PID is retained in the log provided it hasn't expired due to
- * age. This ensures that PIDs will not be expired until either the max expiration time has been reached,
- * or if the topic also is configured for deletion, the segment containing the last written offset has
- * been deleted.
- */
-@nonthreadsafe
-class ProducerIdMapping(val config: LogConfig,
-                        val topicPartition: TopicPartition,
-                        val logDir: File,
-                        val maxPidExpirationMs: Int) extends Logging {
-  import ProducerIdMapping._
-
-  private val pidMap = mutable.Map[Long, ProducerIdEntry]()
-  private var lastMapOffset = 0L
-  private var lastSnapOffset = 0L
-
-  /**
-   * Returns the last offset of this map
-   */
-  def mapEndOffset = lastMapOffset
-
-  /**
-   * Get a copy of the active producers
-   */
-  def activePids: immutable.Map[Long, ProducerIdEntry] = pidMap.toMap
-
-  private def loadFromSnapshot(logStartOffset: Long, currentTime: Long) {
-    pidMap.clear()
-
-    while (true) {
-      latestSnapshotFile match {
-        case Some(file) =>
-          try {
-            info(s"Loading PID mapping from snapshot file ${file.getName} for partition $topicPartition")
-            readSnapshot(file).foreach { case (pid, entry) =>
-              if (!isExpired(currentTime, entry))
-                pidMap.put(pid, entry)
-            }
-
-            lastSnapOffset = Log.offsetFromFilename(file.getName)
-            lastMapOffset = lastSnapOffset
-            return
-          } catch {
-            case e: CorruptSnapshotException =>
-              error(s"Snapshot file at ${file.getPath} is corrupt: ${e.getMessage}")
-              Files.deleteIfExists(file.toPath)
-          }
-        case None =>
-          lastSnapOffset = logStartOffset
-          lastMapOffset = logStartOffset
-          return
-      }
-    }
-  }
-
-  private def isExpired(currentTimeMs: Long, producerIdEntry: ProducerIdEntry) : Boolean =
-    currentTimeMs - producerIdEntry.timestamp >= maxPidExpirationMs
-
-
-  def removeExpiredPids(currentTimeMs: Long) {
-    pidMap.retain { case (pid, lastEntry) =>
-      !isExpired(currentTimeMs, lastEntry)
-    }
-  }
-
-  /**
-   * Truncate the PID mapping to the given offset range and reload the entries from the most recent
-   * snapshot in range (if there is one).
-   */
-  def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
-    if (logEndOffset != mapEndOffset) {
-      deleteSnapshotFiles { file =>
-        val offset = Log.offsetFromFilename(file.getName)
-        offset > logEndOffset || offset <= logStartOffset
-      }
-      loadFromSnapshot(logStartOffset, currentTimeMs)
-    } else {
-      expirePids(logStartOffset)
-    }
-  }
-
-  /**
-   * Update the mapping with the given append information
-   */
-  def update(appendInfo: ProducerAppendInfo): Unit = {
-    if (appendInfo.pid == RecordBatch.NO_PRODUCER_ID)
-      throw new IllegalArgumentException("Invalid PID passed to update")
-    val entry = appendInfo.lastEntry
-    pidMap.put(appendInfo.pid, entry)
-  }
-
-  def updateMapEndOffset(lastOffset: Long): Unit = {
-    lastMapOffset = lastOffset
-  }
-
-  /**
-   * Load a previously stored PID entry into the cache. Ignore the entry if the timestamp is older
-   * than the current time minus the PID expiration time (i.e. if the PID has expired).
-   */
-  def load(pid: Long, entry: ProducerIdEntry, currentTimeMs: Long) {
-    if (pid != RecordBatch.NO_PRODUCER_ID && !isExpired(currentTimeMs, entry))
-      pidMap.put(pid, entry)
-  }
-
-  /**
-   * Get the last written entry for the given PID.
-   */
-  def lastEntry(pid: Long): Option[ProducerIdEntry] = pidMap.get(pid)
-
-  /**
-   * Write a new snapshot if there have been updates since the last one.
-   */
-  def maybeTakeSnapshot() {
-    // If not a new offset, then it is not worth taking another snapshot
-    if (lastMapOffset > lastSnapOffset) {
-      val snapshotFile = Log.pidSnapshotFilename(logDir, lastMapOffset)
-      debug(s"Writing producer snapshot for partition $topicPartition at offset $lastMapOffset")
-      writeSnapshot(snapshotFile, pidMap)
-
-      // Update the last snap offset according to the serialized map
-      lastSnapOffset = lastMapOffset
-
-      maybeRemoveOldestSnapshot()
-    }
-  }
-
-  /**
-   * Get the last offset (exclusive) of the latest snapshot file.
-   */
-  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => Log.offsetFromFilename(file.getName))
-
-  /**
-   * When we remove the head of the log due to retention, we need to clean up the id map. This method takes
-   * the new start offset and expires all pids which have a smaller last written offset.
-   */
-  def expirePids(logStartOffset: Long) {
-    pidMap.retain((pid, entry) => entry.lastOffset >= logStartOffset)
-    deleteSnapshotFiles(file => Log.offsetFromFilename(file.getName) <= logStartOffset)
-    if (lastMapOffset < logStartOffset)
-      lastMapOffset = logStartOffset
-    lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
-  }
-
-  /**
-   * Truncate the PID mapping and remove all snapshots. This resets the state of the mapping.
-   */
-  def truncate() {
-    pidMap.clear()
-    deleteSnapshotFiles()
-    lastSnapOffset = 0L
-    lastMapOffset = 0L
-  }
-
-  private def maybeRemoveOldestSnapshot() {
-    val list = listSnapshotFiles
-    if (list.size > maxPidSnapshotsToRetain) {
-      val toDelete = list.minBy(file => Log.offsetFromFilename(file.getName))
-      Files.deleteIfExists(toDelete.toPath)
-    }
-  }
-
-  private def listSnapshotFiles: List[File] = {
-    if (logDir.exists && logDir.isDirectory)
-      logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList
-    else
-      List.empty[File]
-  }
-
-  private def latestSnapshotFile: Option[File] = {
-    val files = listSnapshotFiles
-    if (files.nonEmpty)
-      Some(files.maxBy(file => Log.offsetFromFilename(file.getName)))
-    else
-      None
-  }
-
-  private def deleteSnapshotFiles(predicate: File => Boolean = _ => true) {
-    listSnapshotFiles.filter(predicate).foreach(file => Files.deleteIfExists(file.toPath))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
new file mode 100644
index 0000000..b1a43d2
--- /dev/null
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.file.Files
+
+import kafka.common.KafkaException
+import kafka.log.Log.offsetFromFilename
+import kafka.server.LogOffsetMetadata
+import kafka.utils.{Logging, nonthreadsafe, threadsafe}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.protocol.types._
+import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
+
+import scala.collection.mutable.ListBuffer
+import scala.collection.{immutable, mutable}
+
+class CorruptSnapshotException(msg: String) extends KafkaException(msg)
+
+private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) {
+  def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset))
+}
+
+private[log] object ProducerIdEntry {
+  val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+    -1, 0, RecordBatch.NO_TIMESTAMP, -1, None)
+}
+
+private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, lastSeq: Int, lastOffset: Long,
+                                        offsetDelta: Int, timestamp: Long, coordinatorEpoch: Int,
+                                        currentTxnFirstOffset: Option[Long]) {
+  def firstSeq: Int = lastSeq - offsetDelta
+  def firstOffset: Long = lastOffset - offsetDelta
+
+  def isDuplicate(batch: RecordBatch): Boolean = {
+    batch.producerEpoch == producerEpoch &&
+      batch.baseSequence == firstSeq &&
+      batch.lastSequence == lastSeq
+  }
+}
+
+/**
+ * This class is used to validate the records appended by a given producer before they are written to the log.
+ * It is initialized with the producer's state after the last successful append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry, loadingFromLog: Boolean = false) {
+  private var producerEpoch = initialEntry.producerEpoch
+  private var firstSeq = initialEntry.firstSeq
+  private var lastSeq = initialEntry.lastSeq
+  private var lastOffset = initialEntry.lastOffset
+  private var maxTimestamp = initialEntry.timestamp
+  private var currentTxnFirstOffset = initialEntry.currentTxnFirstOffset
+  private var coordinatorEpoch = initialEntry.coordinatorEpoch
+  private val transactions = ListBuffer.empty[TxnMetadata]
+
+  def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) =
+    this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
+
+  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = {
+    if (this.producerEpoch > epoch) {
+      throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
+        s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} (server epoch)")
+    } else if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < epoch) {
+      if (firstSeq != 0)
+        throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " +
+          s"(request epoch), $firstSeq (seq. number)")
+    } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
+      // the epoch was bumped by a control record, so we expect the sequence number to be reset
+      throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), found $firstSeq " +
+        s"(incoming seq. number), but expected 0")
+    } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
+      throw new DuplicateSequenceNumberException(s"Duplicate sequence number: pid: $producerId, (incomingBatch.firstSeq, " +
+        s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq): " +
+        s"(${this.firstSeq}, ${this.lastSeq}).")
+    } else if (firstSeq != this.lastSeq + 1L) {
+      throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), $firstSeq " +
+        s"(incoming seq. number), ${this.lastSeq} (current end sequence number)")
+    }
+  }
+
+  def append(batch: RecordBatch): Option[CompletedTxn] = {
+    if (batch.isControlBatch) {
+      val record = batch.iterator.next()
+      val endTxnMarker = EndTransactionMarker.deserialize(record)
+      val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp)
+      Some(completedTxn)
+    } else {
+      append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset,
+        batch.isTransactional)
+      None
+    }
+  }
+
+  def append(epoch: Short,
+             firstSeq: Int,
+             lastSeq: Int,
+             lastTimestamp: Long,
+             lastOffset: Long,
+             isTransactional: Boolean): Unit = {
+    if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
+      // skip validation if this is the first entry when loading from the log. Log retention
+      // will generally have removed the beginning entries from each PID
+      validateAppend(epoch, firstSeq, lastSeq)
+
+    this.producerEpoch = epoch
+    this.firstSeq = firstSeq
+    this.lastSeq = lastSeq
+    this.maxTimestamp = lastTimestamp
+    this.lastOffset = lastOffset
+
+    if (currentTxnFirstOffset.isDefined && !isTransactional)
+      throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId")
+
+    if (isTransactional && currentTxnFirstOffset.isEmpty) {
+      val firstOffset = lastOffset - (lastSeq - firstSeq)
+      currentTxnFirstOffset = Some(firstOffset)
+      transactions += new TxnMetadata(producerId, firstOffset)
+    }
+  }
+
+  def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
+                         producerEpoch: Short,
+                         offset: Long,
+                         timestamp: Long): CompletedTxn = {
+    if (this.producerEpoch > producerEpoch)
+      throw new ProducerFencedException(s"Invalid producer epoch: $producerEpoch (zombie): ${this.producerEpoch} (current)")
+
+    if (this.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
+      throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch} " +
+        s"(zombie), $coordinatorEpoch (current)")
+
+    if (producerEpoch > this.producerEpoch) {
+      // it is possible that this control record is the first record seen from a new epoch (the producer
+      // may fail before sending to the partition or the request itself could fail for some reason). In this
+      // case, we bump the epoch and reset the sequence numbers
+      this.producerEpoch = producerEpoch
+      this.firstSeq = RecordBatch.NO_SEQUENCE
+      this.lastSeq = RecordBatch.NO_SEQUENCE
+    } else {
+      // the control record is the last append to the log, so the last offset will be updated to point to it.
+      // However, the sequence numbers still point to the previous batch, so the duplicate check would no longer
+      // be correct: it would return the wrong offset. To fix this, we treat the control record as a batch
+      // of size 1 which uses the last appended sequence number.
+      this.firstSeq = this.lastSeq
+    }
+
+    val firstOffset = currentTxnFirstOffset match {
+      case Some(firstOffset) => firstOffset
+      case None =>
+        transactions += new TxnMetadata(producerId, offset)
+        offset
+    }
+
+    this.lastOffset = offset
+    this.currentTxnFirstOffset = None
+    this.maxTimestamp = timestamp
+    this.coordinatorEpoch = endTxnMarker.coordinatorEpoch
+    CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT)
+  }
+
+  def lastEntry: ProducerIdEntry =
+    ProducerIdEntry(producerId, producerEpoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp,
+      coordinatorEpoch, currentTxnFirstOffset)
+
+  def startedTransactions: List[TxnMetadata] = transactions.toList
+
+  def maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata: LogOffsetMetadata): Unit = {
+    // we will cache the log offset metadata if it corresponds to the starting offset of
+    // the last transaction that was started. This is optimized for leader appends where it
+    // is only possible to have one transaction started for each log append, and the log
+    // offset metadata will always match in that case since no data from other producers
+    // is mixed into the append
+    transactions.headOption.foreach { txn =>
+      if (txn.firstOffset.messageOffset == logOffsetMetadata.messageOffset)
+        txn.firstOffset = logOffsetMetadata
+    }
+  }
+
+}
+
+object ProducerStateManager {
+  private val PidSnapshotVersion: Short = 1
+  private val VersionField = "version"
+  private val CrcField = "crc"
+  private val PidField = "pid"
+  private val LastSequenceField = "last_sequence"
+  private val ProducerEpochField = "epoch"
+  private val LastOffsetField = "last_offset"
+  private val OffsetDeltaField = "offset_delta"
+  private val TimestampField = "timestamp"
+  private val PidEntriesField = "pid_entries"
+  private val CoordinatorEpochField = "coordinator_epoch"
+  private val CurrentTxnFirstOffsetField = "current_txn_first_offset"
+
+  private val VersionOffset = 0
+  private val CrcOffset = VersionOffset + 2
+  private val PidEntriesOffset = CrcOffset + 4
+
+  val PidSnapshotEntrySchema = new Schema(
+    new Field(PidField, Type.INT64, "The producer ID"),
+    new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"),
+    new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"),
+    new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"),
+    new Field(OffsetDeltaField, Type.INT32, "The difference of the last sequence and first sequence in the last written batch"),
+    new Field(TimestampField, Type.INT64, "Max timestamp from the last written entry"),
+    new Field(CoordinatorEpochField, Type.INT32, "The epoch of the last transaction coordinator to send an end transaction marker"),
+    new Field(CurrentTxnFirstOffsetField, Type.INT64, "The first offset of the on-going transaction (-1 if there is none)"))
+  val PidSnapshotMapSchema = new Schema(
+    new Field(VersionField, Type.INT16, "Version of the snapshot file"),
+    new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
+    new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
+
+  def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
+    val buffer = Files.readAllBytes(file.toPath)
+    val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
+
+    val version = struct.getShort(VersionField)
+    if (version != PidSnapshotVersion)
+      throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
+
+    val crc = struct.getUnsignedInt(CrcField)
+    val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
+    if (crc != computedCrc)
+      throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " +
+        s"Stored crc: $crc. Computed crc: $computedCrc")
+
+    struct.getArray(PidEntriesField).map { pidEntryObj =>
+      val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
+      val pid: Long = pidEntryStruct.getLong(PidField)
+      val epoch = pidEntryStruct.getShort(ProducerEpochField)
+      val seq = pidEntryStruct.getInt(LastSequenceField)
+      val offset = pidEntryStruct.getLong(LastOffsetField)
+      val timestamp = pidEntryStruct.getLong(TimestampField)
+      val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField)
+      val coordinatorEpoch = pidEntryStruct.getInt(CoordinatorEpochField)
+      val currentTxnFirstOffset = pidEntryStruct.getLong(CurrentTxnFirstOffsetField)
+      val newEntry = ProducerIdEntry(pid, epoch, seq, offset, offsetDelta, timestamp,
+        coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
+      newEntry
+    }
+  }
+
+  private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) {
+    val struct = new Struct(PidSnapshotMapSchema)
+    struct.set(VersionField, PidSnapshotVersion)
+    struct.set(CrcField, 0L) // we'll fill this after writing the entries
+    val entriesArray = entries.map {
+      case (pid, entry) =>
+        val pidEntryStruct = struct.instance(PidEntriesField)
+        pidEntryStruct.set(PidField, pid)
+          .set(ProducerEpochField, entry.producerEpoch)
+          .set(LastSequenceField, entry.lastSeq)
+          .set(LastOffsetField, entry.lastOffset)
+          .set(OffsetDeltaField, entry.offsetDelta)
+          .set(TimestampField, entry.timestamp)
+          .set(CoordinatorEpochField, entry.coordinatorEpoch)
+          .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
+        pidEntryStruct
+    }.toArray
+    struct.set(PidEntriesField, entriesArray)
+
+    val buffer = ByteBuffer.allocate(struct.sizeOf)
+    struct.writeTo(buffer)
+    buffer.flip()
+
+    // now fill in the CRC
+    val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset)
+    ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
+
+    val fos = new FileOutputStream(file)
+    try {
+      fos.write(buffer.array, buffer.arrayOffset, buffer.limit)
+    } finally {
+      fos.close()
+    }
+  }
+
+  private def isSnapshotFile(name: String): Boolean = name.endsWith(Log.PidSnapshotFileSuffix)
+
+}
+
+/**
+ * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ *
+ * The sequence number is the last number successfully appended to the partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message
+ * appended to the partition.
+ *
+ * As long as a PID is contained in the map, the corresponding producer can continue to write data.
+ * However, PIDs can be expired due to lack of recent use or if the last written entry has been deleted from
+ * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure
+ * that the most recent entry from a given PID is retained in the log provided it hasn't expired due to
+ * age. This ensures that PIDs will not be expired until either the max expiration time has been reached,
+ * or if the topic also is configured for deletion, the segment containing the last written offset has
+ * been deleted.
+ */
+@nonthreadsafe
+class ProducerStateManager(val topicPartition: TopicPartition,
+                           val logDir: File,
+                           val maxPidExpirationMs: Int = 60 * 60 * 1000) extends Logging {
+  import ProducerStateManager._
+  import java.util
+
+  private val producers = mutable.Map.empty[Long, ProducerIdEntry]
+  private var lastMapOffset = 0L
+  private var lastSnapOffset = 0L
+
+  // ongoing transactions sorted by the first offset of the transaction
+  private val ongoingTxns = new util.TreeMap[Long, TxnMetadata]
+
+  // completed transactions whose markers are at offsets above the high watermark
+  private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
+
+  /**
+   * An unstable offset is one which is either undecided (i.e. its ultimate outcome is not yet known),
+   * or one that is decided, but may not have been replicated (i.e. any transaction which has a COMMIT/ABORT
+   * marker written at a higher offset than the current high watermark).
+   */
+  def firstUnstableOffset: Option[LogOffsetMetadata] = {
+    val unreplicatedFirstOffset = Option(unreplicatedTxns.firstEntry).map(_.getValue.firstOffset)
+    val undecidedFirstOffset = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset)
+    if (unreplicatedFirstOffset.isEmpty)
+      undecidedFirstOffset
+    else if (undecidedFirstOffset.isEmpty)
+      unreplicatedFirstOffset
+    else if (undecidedFirstOffset.get.messageOffset < unreplicatedFirstOffset.get.messageOffset)
+      undecidedFirstOffset
+    else
+      unreplicatedFirstOffset
+  }
+
+  /**
+   * Acknowledge all transactions which have been completed before a given offset. This allows the LSO
+   * to advance to the next unstable offset.
+   */
+  def onHighWatermarkUpdated(highWatermark: Long): Unit = {
+    removeUnreplicatedTransactions(highWatermark)
+  }
+
+  /**
+   * The first undecided offset is the earliest transactional message which has not yet been committed
+   * or aborted.
+   */
+  def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
+
+  /**
+   * Returns the last offset of this map
+   */
+  def mapEndOffset = lastMapOffset
+
+  /**
+   * Get a copy of the active producers
+   */
+  def activeProducers: immutable.Map[Long, ProducerIdEntry] = producers.toMap
+
+  def isEmpty: Boolean = producers.isEmpty && unreplicatedTxns.isEmpty
+
+  private def loadFromSnapshot(logStartOffset: Long, currentTime: Long) {
+    while (true) {
+      latestSnapshotFile match {
+        case Some(file) =>
+          try {
+            info(s"Loading producer state from snapshot file ${file.getName} for partition $topicPartition")
+            readSnapshot(file).filter(!isExpired(currentTime, _)).foreach(loadProducerEntry)
+            lastSnapOffset = offsetFromFilename(file.getName)
+            lastMapOffset = lastSnapOffset
+            return
+          } catch {
+            case e: CorruptSnapshotException =>
+              error(s"Snapshot file at ${file.getPath} is corrupt: ${e.getMessage}")
+              Files.deleteIfExists(file.toPath)
+          }
+        case None =>
+          lastSnapOffset = logStartOffset
+          lastMapOffset = logStartOffset
+          return
+      }
+    }
+  }
+
+  // visible for testing
+  private[log] def loadProducerEntry(entry: ProducerIdEntry): Unit = {
+    val pid = entry.producerId
+    producers.put(pid, entry)
+    entry.currentTxnFirstOffset.foreach { offset =>
+      ongoingTxns.put(offset, new TxnMetadata(pid, offset))
+    }
+  }
+
+  private def isExpired(currentTimeMs: Long, producerIdEntry: ProducerIdEntry): Boolean =
+    producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.timestamp >= maxPidExpirationMs
+
+  /**
+   * Expire any PIDs which have been idle longer than the configured maximum expiration timeout.
+   */
+  def removeExpiredProducers(currentTimeMs: Long) {
+    producers.retain { case (pid, lastEntry) =>
+      !isExpired(currentTimeMs, lastEntry)
+    }
+  }
+
+  /**
+   * Truncate the PID mapping to the given offset range and reload the entries from the most recent
+   * snapshot in range (if there is one). Note that the log end offset is assumed to be less than
+   * or equal to the high watermark.
+   */
+  def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
+    if (logEndOffset != mapEndOffset) {
+      producers.clear()
+      ongoingTxns.clear()
+
+      // since we assume that the offset is less than or equal to the high watermark, it is
+      // safe to clear the unreplicated transactions
+      unreplicatedTxns.clear()
+      deleteSnapshotFiles { file =>
+        val offset = offsetFromFilename(file.getName)
+        offset > logEndOffset || offset <= logStartOffset
+      }
+      loadFromSnapshot(logStartOffset, currentTimeMs)
+    } else {
+      evictUnretainedProducers(logStartOffset)
+    }
+  }
+
+  /**
+   * Update the mapping with the given append information
+   */
+  def update(appendInfo: ProducerAppendInfo): Unit = {
+    if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID)
+      throw new IllegalArgumentException("Invalid PID passed to update")
+
+    val entry = appendInfo.lastEntry
+    producers.put(appendInfo.producerId, entry)
+    appendInfo.startedTransactions.foreach { txn =>
+      ongoingTxns.put(txn.firstOffset.messageOffset, txn)
+    }
+  }
+
+  def updateMapEndOffset(lastOffset: Long): Unit = {
+    lastMapOffset = lastOffset
+  }
+
+  /**
+   * Get the last written entry for the given PID.
+   */
+  def lastEntry(producerId: Long): Option[ProducerIdEntry] = producers.get(producerId)
+
+  /**
+   * Take a snapshot at the current end offset if one does not already exist.
+   */
+  def takeSnapshot(): Unit = {
+    // If not a new offset, then it is not worth taking another snapshot
+    if (lastMapOffset > lastSnapOffset) {
+      val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset)
+      debug(s"Writing producer snapshot for partition $topicPartition at offset $lastMapOffset")
+      writeSnapshot(snapshotFile, producers)
+
+      // Update the last snap offset according to the serialized map
+      lastSnapOffset = lastMapOffset
+    }
+  }
+
+  /**
+   * Get the last offset (exclusive) of the latest snapshot file.
+   */
+  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFilename(file.getName))
+
+  /**
+   * Get the last offset (exclusive) of the oldest snapshot file.
+   */
+  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName))
+
+  /**
+   * When we remove the head of the log due to retention, we need to clean up the id map. This method takes
+   * the new start offset and removes all pids which have a smaller last written offset.
+   */
+  def evictUnretainedProducers(logStartOffset: Long) {
+    val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset)
+    val evictedProducerIds = evictedProducerEntries.keySet
+
+    producers --= evictedProducerIds
+    removeEvictedOngoingTransactions(evictedProducerIds)
+    removeUnreplicatedTransactions(logStartOffset)
+
+    deleteSnapshotFiles(file => offsetFromFilename(file.getName) <= logStartOffset)
+    if (lastMapOffset < logStartOffset)
+      lastMapOffset = logStartOffset
+    lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
+  }
+
+  private def removeEvictedOngoingTransactions(expiredProducerIds: collection.Set[Long]): Unit = {
+    val iterator = ongoingTxns.entrySet.iterator
+    while (iterator.hasNext) {
+      val txnEntry = iterator.next()
+      if (expiredProducerIds.contains(txnEntry.getValue.producerId))
+        iterator.remove()
+    }
+  }
+
+  private def removeUnreplicatedTransactions(offset: Long): Unit = {
+    val iterator = unreplicatedTxns.entrySet.iterator
+    while (iterator.hasNext) {
+      val txnEntry = iterator.next()
+      val lastOffset = txnEntry.getValue.lastOffset
+      if (lastOffset.exists(_ < offset))
+        iterator.remove()
+    }
+  }
+
+  /**
+   * Truncate the PID mapping and remove all snapshots. This resets the state of the mapping.
+   */
+  def truncate() {
+    producers.clear()
+    ongoingTxns.clear()
+    unreplicatedTxns.clear()
+    deleteSnapshotFiles()
+    lastSnapOffset = 0L
+    lastMapOffset = 0L
+  }
+
+  /**
+   * Complete the transaction and return the last stable offset.
+   */
+  def completeTxn(completedTxn: CompletedTxn): Long = {
+    val txnMetdata = ongoingTxns.remove(completedTxn.firstOffset)
+    if (txnMetdata == null)
+      throw new IllegalArgumentException("Attempted to complete a transaction which was not started")
+
+    txnMetdata.lastOffset = Some(completedTxn.lastOffset)
+    unreplicatedTxns.put(completedTxn.firstOffset, txnMetdata)
+
+    val lastStableOffset = firstUndecidedOffset.getOrElse(completedTxn.lastOffset + 1)
+    lastStableOffset
+  }
+
+  @threadsafe
+  def deleteSnapshotsBefore(offset: Long): Unit = {
+    deleteSnapshotFiles(file => offsetFromFilename(file.getName) < offset)
+  }
+
+  private def listSnapshotFiles: List[File] = {
+    if (logDir.exists && logDir.isDirectory)
+      logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList
+    else
+      List.empty[File]
+  }
+
+  private def oldestSnapshotFile: Option[File] = {
+    val files = listSnapshotFiles
+    if (files.nonEmpty)
+      Some(files.minBy(file => offsetFromFilename(file.getName)))
+    else
+      None
+  }
+
+  private def latestSnapshotFile: Option[File] = {
+    val files = listSnapshotFiles
+    if (files.nonEmpty)
+      Some(files.maxBy(file => offsetFromFilename(file.getName)))
+    else
+      None
+  }
+
+  private def deleteSnapshotFiles(predicate: File => Boolean = _ => true) {
+    listSnapshotFiles.filter(predicate).foreach(file => Files.deleteIfExists(file.toPath))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 731b173..19ab71a 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -144,7 +144,7 @@ class TimeIndex(file: File,
   def lookup(targetTimestamp: Long): TimestampOffset = {
     maybeLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
+      val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
       if (slot == -1)
         TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
       else {
@@ -163,7 +163,7 @@ class TimeIndex(file: File,
   override def truncateTo(offset: Long) {
     inLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, offset, IndexSearchType.VALUE)
+      val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.VALUE)
 
       /* There are 3 cases for choosing the new size
        * 1) if there is no entry in the index <= the offset, delete everything
@@ -206,4 +206,5 @@ class TimeIndex(file: File,
       "Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
           " bytes which is not positive or not a multiple of 12.")
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/TransactionIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
new file mode 100644
index 0000000..bf6a6d4
--- /dev/null
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
+import java.nio.file.StandardOpenOption
+
+import kafka.utils.{Logging, nonthreadsafe}
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.mutable.ListBuffer
+
+private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTransaction], isComplete: Boolean)
+
+/**
+ * The transaction index maintains metadata about the aborted transactions for each segment. This includes
+ * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of
+ * the abort. This index is used to find the aborted transactions in the range of a given fetch request at
+ * the READ_COMMITTED isolation level.
+ *
+ * There is at most one transaction index for each log segment. The entries correspond to the transactions
+ * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions
+ * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in
+ * order to find the start of the transactions.
+ */
+@nonthreadsafe
+class TransactionIndex(val startOffset: Long, @volatile var file: File) extends Logging {
+  // note that the file is not created until we need it
+  @volatile private var maybeChannel: Option[FileChannel] = None
+  private var lastOffset: Option[Long] = None
+
+  if (file.exists)
+    openChannel()
+
+  def append(abortedTxn: AbortedTxn): Unit = {
+    lastOffset.foreach { offset =>
+      if (offset >= abortedTxn.lastOffset)
+        throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially")
+    }
+    lastOffset = Some(abortedTxn.lastOffset)
+    Utils.writeFully(channel, abortedTxn.buffer.duplicate())
+  }
+
+  def flush(): Unit = maybeChannel.foreach(_.force(true))
+
+  def delete(): Boolean = {
+    maybeChannel.forall { channel =>
+      channel.force(true)
+      close()
+      file.delete()
+    }
+  }
+
+  private def channel: FileChannel = {
+    maybeChannel match {
+      case Some(channel) => channel
+      case None => openChannel()
+    }
+  }
+
+  private def openChannel(): FileChannel = {
+    val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE,
+      StandardOpenOption.CREATE)
+    maybeChannel = Some(channel)
+    channel.position(channel.size)
+    channel
+  }
+
+  def truncate() = {
+    maybeChannel.foreach(_.truncate(0))
+    lastOffset = None
+  }
+
+  def close(): Unit = {
+    maybeChannel.foreach(_.close())
+    maybeChannel = None
+  }
+
+  def renameTo(f: File): Unit = {
+    try {
+      if (file.exists)
+        Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+    } finally file = f
+  }
+
+  def truncateTo(offset: Long): Unit = {
+    val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
+    var newLastOffset: Option[Long] = None
+    for ((abortedTxn, position) <- iterator(() => buffer)) {
+      if (abortedTxn.lastOffset >= offset) {
+        channel.truncate(position)
+        lastOffset = newLastOffset
+        return
+      }
+      newLastOffset = Some(abortedTxn.lastOffset)
+    }
+  }
+
+  private def iterator(allocate: () => ByteBuffer): Iterator[(AbortedTxn, Int)] = {
+    maybeChannel match {
+      case None => Iterator.empty
+      case Some(channel) =>
+        var position = 0
+
+        new Iterator[(AbortedTxn, Int)] {
+          override def hasNext: Boolean = channel.position - position >= AbortedTxn.TotalSize
+
+          override def next(): (AbortedTxn, Int) = {
+            try {
+              val buffer = allocate()
+              Utils.readFully(channel, buffer, position)
+              buffer.flip()
+
+              val abortedTxn = new AbortedTxn(buffer)
+              if (abortedTxn.version > AbortedTxn.CurrentVersion)
+                throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version}, " +
+                  s"current version is ${AbortedTxn.CurrentVersion}")
+              val nextEntry = (abortedTxn, position)
+              position += AbortedTxn.TotalSize
+              nextEntry
+            } catch {
+              case e: IOException =>
+                // We received an unexpected error reading from the index file. We propagate this as an
+                // UNKNOWN error to the consumer, which will cause it to retry the fetch.
+                throw new KafkaException(s"Failed to read from the transaction index $file", e)
+            }
+          }
+        }
+    }
+  }
+
+  def allAbortedTxns: List[AbortedTxn] = {
+    iterator(() => ByteBuffer.allocate(AbortedTxn.TotalSize)).map(_._1).toList
+  }
+
+  /**
+   * Collect all aborted transactions which overlap with a given fetch range.
+   *
+   * @param fetchOffset Inclusive first offset of the fetch range
+   * @param upperBoundOffset Exclusive last offset in the fetch range
+   * @return An object containing the aborted transactions and whether the search needs to continue
+   *         into the next log segment.
+   */
+  def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
+    val abortedTransactions = ListBuffer.empty[AbortedTransaction]
+    val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
+    for ((abortedTxn, _) <- iterator(() => buffer)) {
+      if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
+        abortedTransactions += abortedTxn.asAbortedTransaction
+
+      if (abortedTxn.lastStableOffset >= upperBoundOffset)
+        return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
+    }
+    TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
+  }
+
+  def sanityCheck(): Unit = {
+    val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
+    for ((abortedTxn, _) <- iterator(() => buffer)) {
+      require(abortedTxn.lastOffset >= startOffset)
+    }
+  }
+
+}
+
+private[log] object AbortedTxn {
+  val VersionOffset = 0
+  val VersionSize = 2
+  val ProducerIdOffset = VersionOffset + VersionSize
+  val ProducerIdSize = 8
+  val FirstOffsetOffset = ProducerIdOffset + ProducerIdSize
+  val FirstOffsetSize = 8
+  val LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize
+  val LastOffsetSize = 8
+  val LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize
+  val LastStableOffsetSize = 8
+  val TotalSize = LastStableOffsetOffset + LastStableOffsetSize
+
+  val CurrentVersion: Short = 0
+}
+
+private[log] class AbortedTxn(val buffer: ByteBuffer) {
+  import AbortedTxn._
+
+  def this(producerId: Long,
+           firstOffset: Long,
+           lastOffset: Long,
+           lastStableOffset: Long) = {
+    this(ByteBuffer.allocate(AbortedTxn.TotalSize))
+    buffer.putShort(CurrentVersion)
+    buffer.putLong(producerId)
+    buffer.putLong(firstOffset)
+    buffer.putLong(lastOffset)
+    buffer.putLong(lastStableOffset)
+    buffer.flip()
+  }
+
+  def this(completedTxn: CompletedTxn, lastStableOffset: Long) =
+    this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset)
+
+  def version: Short = buffer.get(VersionOffset)
+
+  def producerId: Long = buffer.getLong(ProducerIdOffset)
+
+  def firstOffset: Long = buffer.getLong(FirstOffsetOffset)
+
+  def lastOffset: Long = buffer.getLong(LastOffsetOffset)
+
+  def lastStableOffset: Long = buffer.getLong(LastStableOffsetOffset)
+
+  def asAbortedTransaction: AbortedTransaction = new AbortedTransaction(producerId, firstOffset)
+
+  override def toString: String =
+    s"AbortedTxn(version=$version, producerId=$producerId, firstOffset=$firstOffset, " +
+      s"lastOffset=$lastOffset, lastStableOffset=$lastStableOffset)"
+
+  override def equals(any: Any): Boolean = {
+    any match {
+      case that: AbortedTxn => this.buffer.equals(that.buffer)
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = buffer.hashCode
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index cbee78a..8c4731a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -23,6 +23,7 @@ import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.IsolationLevel
 
 import scala.collection._
 
@@ -45,9 +46,11 @@ case class FetchMetadata(fetchMinBytes: Int,
                          fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)]) {
 
   override def toString = "[minBytes: " + fetchMinBytes + ", " +
-                          "onlyLeader:" + fetchOnlyLeader + ", "
-                          "onlyCommitted: " + fetchOnlyCommitted + ", "
-                          "partitionStatus: " + fetchPartitionStatus + "]"
+    "maxBytes:" + fetchMaxBytes + ", " +
+    "onlyLeader:" + fetchOnlyLeader + ", " +
+    "onlyCommitted: " + fetchOnlyCommitted + ", " +
+    "replicaId: " + replicaId + ", " +
+    "partitionStatus: " + fetchPartitionStatus + "]"
 }
 /**
  * A delayed fetch operation that can be created by the replica manager and watched
@@ -57,6 +60,7 @@ class DelayedFetch(delayMs: Long,
                    fetchMetadata: FetchMetadata,
                    replicaManager: ReplicaManager,
                    quota: ReplicaQuota,
+                   isolationLevel: IsolationLevel,
                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
   extends DelayedOperation(delayMs) {
 
@@ -80,7 +84,9 @@ class DelayedFetch(delayMs: Long,
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
             val replica = replicaManager.getLeaderReplicaIfLocal(topicPartition)
             val endOffset =
-              if (fetchMetadata.fetchOnlyCommitted)
+              if (isolationLevel == IsolationLevel.READ_COMMITTED)
+                replica.lastStableOffset
+              else if (fetchMetadata.fetchOnlyCommitted)
                 replica.highWatermark
               else
                 replica.logEndOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/FetchDataInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
index acfb5b0..cbd54c0 100644
--- a/core/src/main/scala/kafka/server/FetchDataInfo.scala
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -18,7 +18,9 @@
 package kafka.server
 
 import org.apache.kafka.common.record.Records
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 
 case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
                          records: Records,
-                         firstEntryIncomplete: Boolean = false)
+                         firstEntryIncomplete: Boolean = false,
+                         abortedTransactions: Option[List[AbortedTransaction]] = None)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3d821f7..fbd74ac 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer
 import java.lang.{Long => JLong}
 import java.util.{Collections, Properties}
 import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse}
@@ -41,7 +43,7 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, TimestampType}
+import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -436,6 +438,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         produceRequest.timeout.toLong,
         produceRequest.acks,
         internalTopicsAllowed,
+        isFromClient = true,
         authorizedRequestInfo,
         sendResponseCallback)
 
@@ -495,8 +498,9 @@ class KafkaApis(val requestChannel: RequestChannel,
             case _ => data
           }
 
+          val abortedTransactions = convertedData.abortedTransactions.map(_.asJava).orNull
           tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            convertedData.logStartOffset, null, convertedData.records)
+            convertedData.logStartOffset, abortedTransactions, convertedData.records)
         }
       }
 
@@ -560,7 +564,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         versionId <= 2,
         authorizedRequestInfo,
         replicationQuota(fetchRequest),
-        sendResponseCallback)
+        sendResponseCallback,
+        fetchRequest.isolationLevel)
     }
   }
 
@@ -589,7 +594,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (version == 0)
         handleListOffsetRequestV0(request)
       else
-        handleListOffsetRequestV1(request)
+        handleListOffsetRequestV1AndAbove(request)
 
     def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava)
     sendResponseMaybeThrottle(request, createResponse)
@@ -646,7 +651,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     responseMap ++ unauthorizedResponseStatus
   }
 
-  private def handleListOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
@@ -679,9 +684,13 @@ class KafkaApis(val requestChannel: RequestChannel,
             replicaManager.getReplicaOrException(topicPartition)
 
           val found = {
-            if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
-              TimestampOffset(RecordBatch.NO_TIMESTAMP, localReplica.highWatermark.messageOffset)
-            else {
+            if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
+              val lastFetchableOffset = offsetRequest.isolationLevel match {
+                case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
+                case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
+              }
+              TimestampOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset)
+            } else {
               def allowed(timestampOffset: TimestampOffset): Boolean =
                 !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset
 
@@ -1415,9 +1424,45 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
     authorizeClusterAction(request)
-    val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
-    val responseBody = new WriteTxnMarkersResponse(emptyResponse)
-    sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody))
+    val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
+    val errors = new ConcurrentHashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
+    val markers = writeTxnMarkersRequest.markers
+    val numAppends = new AtomicInteger(markers.size)
+
+    if (numAppends.get == 0) {
+      sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
+      return
+    }
+
+    def sendResponseCallback(pid: Long)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
+      errors.put(pid, responseStatus.mapValues(_.error).asJava)
+      if (numAppends.decrementAndGet() == 0)
+        sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
+    }
+
+    // TODO: The current append API makes doing separate writes per producerId a little easier, but it would
+    // be nice to have only one append to the log. This requires pushing the building of the control records
+    // into Log so that we only append those having a valid producer epoch, and exposing a new appendControlRecord
+    // API in ReplicaManager. For now, we've done the simpler approach
+    for (marker <- markers.asScala) {
+      val producerId = marker.producerId
+      val controlRecords = marker.partitions.asScala.map { partition =>
+        val controlRecordType = marker.transactionResult match {
+          case TransactionResult.COMMIT => ControlRecordType.COMMIT
+          case TransactionResult.ABORT => ControlRecordType.ABORT
+        }
+        val endTxnMarker = new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch)
+        partition -> MemoryRecords.withEndTransactionMarker(producerId, marker.producerEpoch, endTxnMarker)
+      }.toMap
+
+      replicaManager.appendRecords(
+        timeout = config.requestTimeoutMs.toLong,
+        requiredAcks = -1,
+        internalTopicsAllowed = true,
+        isFromClient = false,
+        entriesPerPartition = controlRecords,
+        sendResponseCallback(producerId))
+    }
   }
 
   def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 05e9842..edc010e 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -44,7 +44,7 @@ case class LogOffsetMetadata(messageOffset: Long,
 
   // check if this offset is already on an older segment compared with the given offset
   def onOlderSegment(that: LogOffsetMetadata): Boolean = {
-    if (messageOffsetOnly())
+    if (messageOffsetOnly)
       throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
 
     this.segmentBaseOffset < that.segmentBaseOffset
@@ -52,7 +52,7 @@ case class LogOffsetMetadata(messageOffset: Long,
 
   // check if this offset is on the same segment with the given offset
   def onSameSegment(that: LogOffsetMetadata): Boolean = {
-    if (messageOffsetOnly())
+    if (messageOffsetOnly)
       throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
 
     this.segmentBaseOffset == that.segmentBaseOffset
@@ -68,14 +68,14 @@ case class LogOffsetMetadata(messageOffset: Long,
   def positionDiff(that: LogOffsetMetadata): Int = {
     if(!onSameSegment(that))
       throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
-    if(messageOffsetOnly())
+    if(messageOffsetOnly)
       throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")
 
     this.relativePositionInSegment - that.relativePositionInSegment
   }
 
   // decide if the offset metadata only contains message offset info
-  def messageOffsetOnly(): Boolean = {
+  def messageOffsetOnly: Boolean = {
     segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index de670e8..663ab1e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -40,6 +40,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -95,7 +96,8 @@ case class LogReadResult(info: FetchDataInfo,
 
 }
 
-case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records)
+case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records,
+                              abortedTransactions: Option[List[AbortedTransaction]] = None)
 
 object LogReadResult {
   val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
@@ -334,12 +336,14 @@ class ReplicaManager(val config: KafkaConfig,
   def appendRecords(timeout: Long,
                     requiredAcks: Short,
                     internalTopicsAllowed: Boolean,
+                    isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
 
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
-      val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)
+      val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
+        isFromClient = isFromClient, entriesPerPartition, requiredAcks)
       debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
       val produceStatus = localProduceResults.map { case (topicPartition, result) =>
@@ -493,6 +497,7 @@ class ReplicaManager(val config: KafkaConfig,
    * Append the messages to the local replica logs
    */
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
+                               isFromClient: Boolean,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
     trace("Append [%s] to local log ".format(entriesPerPartition))
@@ -510,7 +515,7 @@ class ReplicaManager(val config: KafkaConfig,
           val partitionOpt = getPartition(topicPartition)
           val info = partitionOpt match {
             case Some(partition) =>
-              partition.appendRecordsToLeader(records, requiredAcks)
+              partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
 
             case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
               .format(topicPartition, localBrokerId))
@@ -566,7 +571,8 @@ class ReplicaManager(val config: KafkaConfig,
                     hardMaxBytesLimit: Boolean,
                     fetchInfos: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota = UnboundedQuota,
-                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
+                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
+                    isolationLevel: IsolationLevel) {
     val isFromFollower = replicaId >= 0
     val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
@@ -579,7 +585,8 @@ class ReplicaManager(val config: KafkaConfig,
       fetchMaxBytes = fetchMaxBytes,
       hardMaxBytesLimit = hardMaxBytesLimit,
       readPartitionInfo = fetchInfos,
-      quota = quota)
+      quota = quota,
+      isolationLevel = isolationLevel)
 
     // if the fetch comes from the follower,
     // update its corresponding log end offset
@@ -598,7 +605,8 @@ class ReplicaManager(val config: KafkaConfig,
     //                        4) some error happens while reading data
     if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
-        tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
+        tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records,
+          result.info.abortedTransactions)
       }
       responseCallback(fetchPartitionData)
     } else {
@@ -611,7 +619,7 @@ class ReplicaManager(val config: KafkaConfig,
       }
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
         fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
-      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
+      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, isolationLevel, responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
       val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
@@ -632,7 +640,8 @@ class ReplicaManager(val config: KafkaConfig,
                        fetchMaxBytes: Int,
                        hardMaxBytesLimit: Boolean,
                        readPartitionInfo: Seq[(TopicPartition, PartitionData)],
-                       quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
+                       quota: ReplicaQuota,
+                       isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Seq[(TopicPartition, LogReadResult)] = {
 
     def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
       val offset = fetchInfo.fetchOffset
@@ -654,7 +663,9 @@ class ReplicaManager(val config: KafkaConfig,
           getReplicaOrException(tp)
 
         // decide whether to only fetch committed data (i.e. messages below high watermark)
-        val maxOffsetOpt = if (readOnlyCommitted)
+        val maxOffsetOpt = if (isolationLevel == IsolationLevel.READ_COMMITTED)
+          Some(localReplica.lastStableOffset.messageOffset)
+        else if (readOnlyCommitted)
           Some(localReplica.highWatermark.messageOffset)
         else
           None
@@ -674,7 +685,7 @@ class ReplicaManager(val config: KafkaConfig,
             val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)
 
             // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
-            val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)
+            val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel)
 
             // If the partition is being throttled, simply return an empty set.
             if (shouldLeaderThrottle(quota, tp, replicaId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 7a5f671..0b0ad7b 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -105,6 +105,8 @@ object DumpLogSegments {
           dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
         case Log.PidSnapshotFileSuffix =>
           dumpPidSnapshot(file)
+        case Log.TxnIndexFileSuffix =>
+          dumpTxnIndex(file)
         case _ =>
           System.err.println(s"Ignoring unknown file $file")
       }
@@ -131,11 +133,20 @@ object DumpLogSegments {
     }
   }
 
+  private def dumpTxnIndex(file: File): Unit = {
+    val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file)
+    for (abortedTxn <- index.allAbortedTxns) {
+      println(s"version: ${abortedTxn.version} pid: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
+        s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
+    }
+  }
+
   private def dumpPidSnapshot(file: File): Unit = {
     try {
-      ProducerIdMapping.readSnapshot(file).foreach { case (pid, entry) =>
-        println(s"pid: $pid epoch: ${entry.epoch} lastSequence: ${entry.lastSeq} lastOffset: ${entry.lastOffset} " +
-          s"offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp}")
+      ProducerStateManager.readSnapshot(file).foreach { entry=>
+        println(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} lastSequence: ${entry.lastSeq} " +
+          s"lastOffset: ${entry.lastOffset} offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp} " +
+          s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset}")
       }
     } catch {
       case e: CorruptSnapshotException =>
@@ -349,9 +360,15 @@ object DumpLogSegments {
               " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
           }
 
-          if (record.isControlRecord) {
-            val controlType = ControlRecordType.parse(record.key)
-            print(s" controlType: $controlType")
+          if (batch.isControlBatch) {
+            val controlTypeId = ControlRecordType.parseTypeId(record.key)
+            ControlRecordType.fromTypeId(controlTypeId) match {
+              case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
+                val endTxnMarker = EndTransactionMarker.deserialize(record)
+                print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
+              case controlType =>
+                print(s" controlType: $controlType($controlTypeId)")
+            }
           } else if (printContents) {
             val (key, payload) = parser.parse(record)
             key.foreach(key => print(s" key: $key"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 757e216..4277d26 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -199,7 +199,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createListOffsetsRequest = {
-    requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes(
+    requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED).setTargetTimes(
       Map(tp -> (0L: java.lang.Long)).asJava).
       build()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
index b4aa56f..2dfbf48 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
@@ -1124,7 +1124,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
-      EasyMock.anyBoolean(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
@@ -1205,7 +1206,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
-      EasyMock.anyBoolean(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 387d4b3..9053e0a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -28,7 +28,7 @@ import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
@@ -509,7 +509,8 @@ class GroupMetadataManagerTest {
     time.sleep(2)
 
     EasyMock.reset(partition)
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt()))
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -541,7 +542,8 @@ class GroupMetadataManagerTest {
 
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
@@ -588,7 +590,8 @@ class GroupMetadataManagerTest {
 
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
     EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(replicaManager, partition)
 
@@ -664,7 +667,8 @@ class GroupMetadataManagerTest {
     EasyMock.reset(partition)
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -738,7 +742,8 @@ class GroupMetadataManagerTest {
 
     // expect the offset tombstone
     EasyMock.reset(partition)
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt()))
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
     EasyMock.replay(partition)
 
@@ -758,7 +763,8 @@ class GroupMetadataManagerTest {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
-      EasyMock.anyBoolean(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
@@ -794,7 +800,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
-    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true)))
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
     EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
       .andReturn(records.buffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 94dc12b..09a89dd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -27,6 +27,7 @@ import kafka.utils.TestUtils.fail
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.MockTime
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
@@ -349,7 +350,8 @@ class TransactionStateManagerTest {
     EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset))
 
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
-    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true)))
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true),
+      EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
     EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
       .andReturn(records.buffer)
@@ -363,7 +365,8 @@ class TransactionStateManagerTest {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
-      EasyMock.anyBoolean(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument)))
       .andAnswer(new IAnswer[Unit] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index cfd66de..a42ae22 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -57,7 +57,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
 
     /* append two messages */
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0,
-      new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
+          new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
 
     def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 19a97bc..8a119c2 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -259,7 +259,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
     for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val value = counter.toString
       val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
-        key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
+              key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
       counter += 1
       (key, value, appendInfo.firstOffset)
     }


Mime
View raw message