kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics
Date Mon, 03 Apr 2017 02:41:48 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/ProducerIdMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
new file mode 100644
index 0000000..a870b7d
--- /dev/null
+++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.file.Files
+
+import kafka.common.KafkaException
+import kafka.utils.{Logging, nonthreadsafe}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException}
+import org.apache.kafka.common.protocol.types._
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
+
+import scala.collection.{immutable, mutable}
+
+private[log] object ProducerIdEntry {
+  val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+    -1, 0, RecordBatch.NO_TIMESTAMP)
+}
+
+private[log] case class ProducerIdEntry(epoch: Short, lastSeq: Int, lastOffset: Long, numRecords: Int, timestamp: Long) {
+  def firstSeq: Int = lastSeq - numRecords + 1
+  def firstOffset: Long = lastOffset - numRecords + 1
+
+  def isDuplicate(batch: RecordBatch): Boolean = {
+    batch.producerEpoch == epoch &&
+      batch.baseSequence == firstSeq &&
+      batch.lastSequence == lastSeq
+  }
+}
+
+private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEntry) {
+  // the initialEntry here is the last successfull appended batch. we validate incoming entries transitively, starting
+  // with the last appended entry.
+  private var epoch = initialEntry.epoch
+  private var firstSeq = initialEntry.firstSeq
+  private var lastSeq = initialEntry.lastSeq
+  private var lastOffset = initialEntry.lastOffset
+  private var maxTimestamp = initialEntry.timestamp
+
+  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = {
+    if (this.epoch > epoch) {
+      throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer with a newer epoch. $epoch (request epoch), ${this.epoch} (server epoch)")
+    } else if (this.epoch == RecordBatch.NO_PRODUCER_EPOCH || this.epoch < epoch) {
+      if (firstSeq != 0)
+        throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " +
+          s"(request epoch), $firstSeq (seq. number)")
+    } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
+      throw new DuplicateSequenceNumberException(s"Duplicate sequence number: $pid (pid), $firstSeq " +
+        s"(seq. number), ${this.firstSeq} (expected seq. number)")
+    } else if (firstSeq != this.lastSeq + 1L) {
+      throw new OutOfOrderSequenceException(s"Invalid sequence number: $pid (pid), $firstSeq " +
+        s"(seq. number), ${this.lastSeq} (expected seq. number)")
+    }
+  }
+
+  def assignLastOffsetAndTimestamp(lastOffset: Long, lastTimestamp: Long): Unit = {
+    this.lastOffset = lastOffset
+    this.maxTimestamp = lastTimestamp
+  }
+
+  private def append(epoch: Short, firstSeq: Int, lastSeq: Int, lastTimestamp: Long, lastOffset: Long) {
+    validateAppend(epoch, firstSeq, lastSeq)
+    this.epoch = epoch
+    this.firstSeq = firstSeq
+    this.lastSeq = lastSeq
+    this.maxTimestamp = lastTimestamp
+    this.lastOffset = lastOffset
+  }
+
+  def append(batch: RecordBatch): Unit =
+    append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset)
+
+  def append(entry: ProducerIdEntry): Unit =
+    append(entry.epoch, entry.firstSeq, entry.lastSeq, entry.timestamp, entry.lastOffset)
+
+  def lastEntry: ProducerIdEntry =
+    ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq + 1, maxTimestamp)
+}
+
+private[log] class CorruptSnapshotException(msg: String) extends KafkaException(msg)
+
+object ProducerIdMapping {
+  private val DirnamePrefix = "pid-mapping"
+  private val FilenameSuffix = "snapshot"
+  private val FilenamePattern = s"^\\d{1,}.$FilenameSuffix".r
+  private val PidSnapshotVersion: Short = 1
+
+  private val VersionField = "version"
+  private val CrcField = "crc"
+  private val PidField = "pid"
+  private val LastSequenceField = "last_sequence"
+  private val EpochField = "epoch"
+  private val LastOffsetField = "last_offset"
+  private val NumRecordsField = "num_records"
+  private val TimestampField = "timestamp"
+  private val PidEntriesField = "pid_entries"
+
+  private val VersionOffset = 0
+  private val CrcOffset = VersionOffset + 2
+  private val PidEntriesOffset = CrcOffset + 4
+
+  private val maxPidSnapshotsToRetain = 2
+
+  val PidSnapshotEntrySchema = new Schema(
+    new Field(PidField, Type.INT64, "The producer ID"),
+    new Field(EpochField, Type.INT16, "Current epoch of the producer"),
+    new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"),
+    new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"),
+    new Field(NumRecordsField, Type.INT32, "The number of records written in the last log entry"),
+    new Field(TimestampField, Type.INT64, "Max timestamp from the last written entry"))
+  val PidSnapshotMapSchema = new Schema(
+    new Field(VersionField, Type.INT16, "Version of the snapshot file"),
+    new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
+    new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table"))
+
+  private def loadSnapshot(file: File, pidMap: mutable.Map[Long, ProducerIdEntry],
+                           checkNotExpired: (ProducerIdEntry) => Boolean) {
+    val buffer = Files.readAllBytes(file.toPath)
+    val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
+
+    val version = struct.getShort(VersionField)
+    if (version != PidSnapshotVersion)
+      throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
+
+    val crc = struct.getUnsignedInt(CrcField)
+    val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
+    if (crc != computedCrc)
+      throw new CorruptSnapshotException(s"Snapshot file is corrupted (CRC is no longer valid). Stored crc: ${crc}. Computed crc: ${computedCrc}")
+
+    struct.getArray(PidEntriesField).foreach { pidEntryObj =>
+      val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
+      val pid = pidEntryStruct.getLong(PidField)
+      val epoch = pidEntryStruct.getShort(EpochField)
+      val seq = pidEntryStruct.getInt(LastSequenceField)
+      val offset = pidEntryStruct.getLong(LastOffsetField)
+      val timestamp = pidEntryStruct.getLong(TimestampField)
+      val numRecords = pidEntryStruct.getInt(NumRecordsField)
+      val newEntry = ProducerIdEntry(epoch, seq, offset, numRecords, timestamp)
+      if (checkNotExpired(newEntry))
+        pidMap.put(pid, newEntry)
+    }
+  }
+
+  private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) {
+    val struct = new Struct(PidSnapshotMapSchema)
+    struct.set(VersionField, PidSnapshotVersion)
+    struct.set(CrcField, 0L) // we'll fill this after writing the entries
+    val entriesArray = entries.map {
+      case (pid, entry) =>
+        val pidEntryStruct = struct.instance(PidEntriesField)
+        pidEntryStruct.set(PidField, pid)
+          .set(EpochField, entry.epoch)
+          .set(LastSequenceField, entry.lastSeq)
+          .set(LastOffsetField, entry.lastOffset)
+          .set(NumRecordsField, entry.numRecords)
+          .set(TimestampField, entry.timestamp)
+        pidEntryStruct
+    }.toArray
+    struct.set(PidEntriesField, entriesArray)
+
+    val buffer = ByteBuffer.allocate(struct.sizeOf)
+    struct.writeTo(buffer)
+    buffer.flip()
+
+    // now fill in the CRC
+    val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset)
+    ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
+
+    val fos = new FileOutputStream(file)
+    try {
+      fos.write(buffer.array, buffer.arrayOffset, buffer.limit)
+    } finally {
+      fos.close()
+    }
+  }
+
+  private def verifyFileName(name: String): Boolean = FilenamePattern.findFirstIn(name).isDefined
+
+  private def offsetFromFile(file: File): Long = {
+    s"${file.getName.replace(s".$FilenameSuffix", "")}".toLong
+  }
+
+  private def formatFileName(lastOffset: Long): String = {
+    // The files will be named '$lastOffset.snapshot' and located in 'logDir/pid-mapping'
+    s"$lastOffset.$FilenameSuffix"
+  }
+
+}
+
+/**
+ * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ *
+ * The sequence number is the last number successfully appended to the partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message
+ * appended to the partition.
+ */
+@nonthreadsafe
+class ProducerIdMapping(val config: LogConfig,
+                        val topicPartition: TopicPartition,
+                        val snapParentDir: File,
+                        val maxPidExpirationMs: Int) extends Logging {
+  import ProducerIdMapping._
+
+  val snapDir: File = new File(snapParentDir, DirnamePrefix)
+  snapDir.mkdir()
+
+  private val pidMap = mutable.Map[Long, ProducerIdEntry]()
+  private var lastMapOffset = 0L
+  private var lastSnapOffset = 0L
+
+  /**
+   * Returns the last offset of this map
+   */
+  def mapEndOffset = lastMapOffset
+
+  /**
+   * Get a copy of the active producers
+   */
+  def activePids: immutable.Map[Long, ProducerIdEntry] = pidMap.toMap
+
+  /**
+   * Load a snapshot of the id mapping or return empty maps
+   * in the case the snapshot doesn't exist (first time).
+   */
+  private def loadFromSnapshot(logEndOffset: Long, checkNotExpired:(ProducerIdEntry) => Boolean) {
+    pidMap.clear()
+
+    var loaded = false
+    while (!loaded) {
+      lastSnapshotFile(logEndOffset) match {
+        case Some(file) =>
+          try {
+            loadSnapshot(file, pidMap, checkNotExpired)
+            lastSnapOffset = offsetFromFile(file)
+            lastMapOffset = lastSnapOffset
+            loaded = true
+          } catch {
+            case e: CorruptSnapshotException =>
+              error(s"Snapshot file at ${file} is corrupt: ${e.getMessage}")
+              file.delete()
+          }
+        case None =>
+          lastSnapOffset = 0L
+          lastMapOffset = 0L
+          snapDir.mkdir()
+          loaded = true
+      }
+    }
+  }
+
+  def isEntryValid(currentTimeMs: Long, producerIdEntry: ProducerIdEntry) : Boolean = {
+    currentTimeMs - producerIdEntry.timestamp < maxPidExpirationMs
+  }
+
+  def checkForExpiredPids(currentTimeMs: Long) {
+    pidMap.retain { case (pid, lastEntry) =>
+      isEntryValid(currentTimeMs, lastEntry)
+    }
+  }
+
+  def truncateAndReload(logEndOffset: Long, currentTime: Long) {
+    truncateSnapshotFiles(logEndOffset)
+    def checkNotExpired = (producerIdEntry: ProducerIdEntry) => { isEntryValid(currentTime, producerIdEntry) }
+    loadFromSnapshot(logEndOffset, checkNotExpired)
+  }
+
+  /**
+   * Update the mapping with the given append information
+   */
+  def update(appendInfo: ProducerAppendInfo): Unit = {
+    if (appendInfo.pid == RecordBatch.NO_PRODUCER_ID)
+      throw new IllegalArgumentException("Invalid PID passed to update")
+    val entry = appendInfo.lastEntry
+    pidMap.put(appendInfo.pid, entry)
+    lastMapOffset = entry.lastOffset + 1
+  }
+
+  /**
+   * Load a previously stored PID entry into the cache. Ignore the entry if the timestamp is older
+   * than the current time minus the PID expiration time (i.e. if the PID has expired).
+   */
+  def load(pid: Long, entry: ProducerIdEntry, currentTimeMs: Long) {
+    if (pid != RecordBatch.NO_PRODUCER_ID && currentTimeMs - entry.timestamp < maxPidExpirationMs) {
+      pidMap.put(pid, entry)
+      lastMapOffset = entry.lastOffset + 1
+    }
+  }
+
+  /**
+   * Get the last written entry for the given PID.
+   */
+  def lastEntry(pid: Long): Option[ProducerIdEntry] = pidMap.get(pid)
+
+  /**
+    * Serialize and write the bytes to a file. The file name is a concatenation of:
+    *   - offset
+    *   - a ".snapshot" suffix
+    *
+    *  The snapshot files are located in the logDirectory, inside a 'pid-mapping' sub directory.
+    */
+  def maybeTakeSnapshot() {
+    // If not a new offset, then it is not worth taking another snapshot
+    if (lastMapOffset > lastSnapOffset) {
+      val file = new File(snapDir, formatFileName(lastMapOffset))
+      writeSnapshot(file, pidMap)
+
+      // Update the last snap offset according to the serialized map
+      lastSnapOffset = lastMapOffset
+
+      maybeRemove()
+    }
+  }
+
+  /**
+    * When we remove the head of the log due to retention, we need to
+    * clean up the id map. This method takes the new start offset and
+    * expires all ids that have a smaller offset.
+    *
+    * @param startOffset New start offset for the log associated to
+    *                    this id map instance
+    */
+  def cleanFrom(startOffset: Long) {
+    pidMap.retain((pid, entry) => entry.firstOffset >= startOffset)
+    if (pidMap.isEmpty)
+      lastMapOffset = -1L
+  }
+
+  private def maybeRemove() {
+    val list = listSnapshotFiles()
+    if (list.size > maxPidSnapshotsToRetain) {
+      // Get file with the smallest offset
+      val toDelete = list.minBy(offsetFromFile)
+      // Delete the last
+      toDelete.delete()
+    }
+  }
+
+  private def listSnapshotFiles(): List[File] = {
+    if (snapDir.exists && snapDir.isDirectory)
+      snapDir.listFiles.filter(f => f.isFile && verifyFileName(f.getName)).toList
+    else
+      List.empty[File]
+  }
+
+  /**
+   * Returns the last valid snapshot with offset smaller than the base offset provided as
+   * a constructor parameter for loading.
+   */
+  private def lastSnapshotFile(maxOffset: Long): Option[File] = {
+    val files = listSnapshotFiles()
+    if (files != null && files.nonEmpty) {
+      val targetOffset = files.foldLeft(0L) { (accOffset, file) =>
+        val snapshotLastOffset = offsetFromFile(file)
+        if ((maxOffset >= snapshotLastOffset) && (snapshotLastOffset > accOffset))
+          snapshotLastOffset
+        else
+          accOffset
+      }
+      val snap = new File(snapDir, formatFileName(targetOffset))
+      if (snap.exists)
+        Some(snap)
+      else
+        None
+    } else
+      None
+  }
+
+  private def truncateSnapshotFiles(maxOffset: Long) {
+    listSnapshotFiles().foreach { file =>
+      val snapshotLastOffset = offsetFromFile(file)
+      if (snapshotLastOffset >= maxOffset)
+        file.delete()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index defbf34..600b84d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.nio.ByteBuffer
-import java.lang.{Long => JLong, Short => JShort}
+import java.lang.{Long => JLong}
 import java.util.{Collections, Properties}
 import java.util
 
@@ -28,7 +28,7 @@ import kafka.cluster.Partition
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.common._
 import kafka.controller.KafkaController
-import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
+import kafka.coordinator.{GroupCoordinator, InitPidResult, JoinGroupResult, TransactionCoordinator}
 import kafka.log._
 import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
@@ -49,6 +49,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse
 
 import scala.collection._
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 /**
  * Logic to handle the various Kafka requests
@@ -56,7 +57,8 @@ import scala.collection.JavaConverters._
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val adminManager: AdminManager,
-                val coordinator: GroupCoordinator,
+                val groupCoordinator: GroupCoordinator,
+                val txnCoordinator: TransactionCoordinator,
                 val controller: KafkaController,
                 val zkUtils: ZkUtils,
                 val brokerId: Int,
@@ -100,6 +102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
+        case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -138,11 +141,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         // leadership changes
         updatedLeaders.foreach { partition =>
           if (partition.topic == Topic.GroupMetadataTopicName)
-            coordinator.handleGroupImmigration(partition.partitionId)
+            groupCoordinator.handleGroupImmigration(partition.partitionId)
         }
         updatedFollowers.foreach { partition =>
           if (partition.topic == Topic.GroupMetadataTopicName)
-            coordinator.handleGroupEmigration(partition.partitionId)
+            groupCoordinator.handleGroupEmigration(partition.partitionId)
         }
       }
 
@@ -181,7 +184,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         // is not cleared.
         result.foreach { case (topicPartition, error) =>
           if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) {
-            coordinator.handleGroupEmigration(topicPartition.partition)
+            groupCoordinator.handleGroupEmigration(topicPartition.partition)
           }
         }
         new StopReplicaResponse(error, result.asJava)
@@ -202,7 +205,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
         val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
         if (deletedPartitions.nonEmpty)
-          coordinator.handleDeletedPartitions(deletedPartitions)
+          groupCoordinator.handleDeletedPartitions(deletedPartitions)
 
         if (adminManager.hasDelayedTopicOperations) {
           updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
@@ -305,7 +308,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val offsetRetention =
           if (header.apiVersion <= 1 ||
             offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
-            coordinator.offsetConfig.offsetsRetentionMs
+            groupCoordinator.offsetConfig.offsetsRetentionMs
           else
             offsetCommitRequest.retentionTime
 
@@ -332,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         // call coordinator to handle commit offset
-        coordinator.handleCommitOffsets(
+        groupCoordinator.handleCommitOffsets(
           offsetCommitRequest.groupId,
           offsetCommitRequest.memberId,
           offsetCommitRequest.generationId,
@@ -792,7 +795,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         java.util.Collections.emptyList())
     } else {
       createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
-        config.offsetsTopicReplicationFactor.toInt, coordinator.offsetsTopicConfigs)
+        config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs)
     }
   }
 
@@ -946,7 +949,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else {
           // versions 1 and above read offsets from Kafka
           if (offsetFetchRequest.isAllPartitions) {
-            val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId)
+            val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId)
             if (error != Errors.NONE)
               offsetFetchRequest.getErrorResponse(error)
             else {
@@ -957,7 +960,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           } else {
             val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
               .partition(authorizeTopicDescribe)
-            val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
+            val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
               Some(authorizedPartitions))
             if (error != Errors.NONE)
               offsetFetchRequest.getErrorResponse(error)
@@ -980,7 +983,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
-      val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
+      val partition = groupCoordinator.partitionFor(groupCoordinatorRequest.groupId)
 
       // get metadata (and create the topic if necessary)
       val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
@@ -1013,7 +1016,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
           groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
         } else {
-          val (error, summary) = coordinator.handleDescribeGroup(groupId)
+          val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
           val members = summary.members.map { member =>
             val metadata = ByteBuffer.wrap(member.metadata)
             val assignment = ByteBuffer.wrap(member.assignment)
@@ -1032,7 +1035,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
       ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
     } else {
-      val (error, groups) = coordinator.handleListGroups()
+      val (error, groups) = groupCoordinator.handleListGroups()
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
       new ListGroupsResponse(error, allGroups.asJava)
     }
@@ -1066,7 +1069,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // let the coordinator to handle join-group
       val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
         (protocol.name, Utils.toArray(protocol.metadata))).toList
-      coordinator.handleJoinGroup(
+      groupCoordinator.handleJoinGroup(
         joinGroupRequest.groupId,
         joinGroupRequest.memberId,
         request.header.clientId,
@@ -1090,7 +1093,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
       sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
-      coordinator.handleSyncGroup(
+      groupCoordinator.handleSyncGroup(
         syncGroupRequest.groupId(),
         syncGroupRequest.generationId(),
         syncGroupRequest.memberId(),
@@ -1117,7 +1120,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
     else {
       // let the coordinator to handle heartbeat
-      coordinator.handleHeartbeat(
+      groupCoordinator.handleHeartbeat(
         heartbeatRequest.groupId(),
         heartbeatRequest.memberId(),
         heartbeatRequest.groupGenerationId(),
@@ -1141,7 +1144,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new Response(request, leaveGroupResponse))
     } else {
       // let the coordinator to handle leave-group
-      coordinator.handleLeaveGroup(
+      groupCoordinator.handleLeaveGroup(
         leaveGroupRequest.groupId(),
         leaveGroupRequest.memberId(),
         sendResponseCallback)
@@ -1308,6 +1311,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleInitPidRequest(request: RequestChannel.Request): Unit = {
+    val initPidRequest = request.body[InitPidRequest]
+    // Send response callback
+    def sendResponseCallback(result: InitPidResult): Unit = {
+      val responseBody: InitPidResponse = new InitPidResponse(result.error, result.pid, result.epoch)
+      trace(s"InitPidRequest : Generated new PID ${result.pid} from InitPidRequest from client ${request.header.clientId}")
+      requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+    }
+    txnCoordinator.handleInitPid(initPidRequest.transactionalId, sendResponseCallback)
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fe6631e..0f2205f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,8 +33,8 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
 
-import scala.collection.Map
 import scala.collection.JavaConverters._
+import scala.collection.Map
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -163,10 +163,14 @@ object Defaults {
   val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
   val ReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
 
+  /** ********* Transaction Configuration ***********/
+  val TransactionalIdExpirationMsDefault = 604800000
+
   val DeleteTopicEnable = false
 
   val CompressionType = "producer"
 
+  val MaxIdMapSnapshots = 2
   /** ********* Kafka Metrics Configuration ***********/
   val MetricNumSamples = 2
   val MetricSampleWindowMs = 30000
@@ -194,7 +198,6 @@ object Defaults {
   val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
   val SaslKerberosPrincipalToLocalRules = SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
-
 }
 
 object KafkaConfig {
@@ -280,6 +283,7 @@ object KafkaConfig {
   val LogMessageFormatVersionProp = LogConfigPrefix + "message.format.version"
   val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type"
   val LogMessageTimestampDifferenceMaxMsProp = LogConfigPrefix + "message.timestamp.difference.max.ms"
+  val LogMaxIdMapSnapshotsProp = LogConfigPrefix + "max.id.map.snapshots"
   val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
   val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
@@ -332,6 +336,8 @@ object KafkaConfig {
   val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
   val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
+  /** ********* Transaction Configuration **********/
+  val TransactionalIdExpirationMsProp = "transactional.id.expiration.ms"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
@@ -568,6 +574,11 @@ object KafkaConfig {
   val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
   val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
+  /** ********* Transaction Configuration ***********/
+  val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
+    "transaction coordinator. Note that this also influences PID expiration: PIDs are guaranteed to expire " +
+    "after expiration of this timeout from the last write by the PID (they may expire sooner if the last write " +
+    "from the PID is deleted due to the topic's retention settings)."
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
@@ -763,6 +774,9 @@ object KafkaConfig {
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
       .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
 
+      /** ********* Transaction configuration ***********/
+      .define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMsDefault, atLeast(1), LOW, TransactionIdExpirationMsDoc)
+
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
       .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
@@ -989,6 +1003,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp)
   val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
 
+  /** ********* Transaction Configuration **************/
+  val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
+
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   val compressionType = getString(KafkaConfig.CompressionTypeProp)
   val listeners: Seq[EndPoint] = getListeners

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d3e49c..e63a6d2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -29,7 +29,7 @@ import kafka.api.KAFKA_0_9_0
 import kafka.cluster.Broker
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
 import kafka.controller.{ControllerStats, KafkaController}
-import kafka.coordinator.GroupCoordinator
+import kafka.coordinator.{GroupCoordinator, TransactionCoordinator}
 import kafka.log.{CleanerConfig, LogConfig, LogManager}
 import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
 import kafka.network.{BlockingChannel, SocketServer}
@@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
@@ -122,6 +122,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   var groupCoordinator: GroupCoordinator = null
 
+  var transactionCoordinator: TransactionCoordinator = null
+
   var kafkaController: KafkaController = null
 
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
@@ -205,7 +207,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 
         /* start log manager */
-        logManager = createLogManager(zkUtils.zkClient, brokerState)
+        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time)
         logManager.startup()
 
         metadataCache = new MetadataCache(config.brokerId)
@@ -229,6 +231,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
         groupCoordinator.startup()
 
+        /* start transaction coordinator */
+        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+        transactionCoordinator = TransactionCoordinator(config, zkUtils, Time.SYSTEM)
+        transactionCoordinator.startup()
+
         /* Get the authorizer and initialize it if one is specified.*/
         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
           val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
@@ -237,9 +244,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         }
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
-          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
-          clusterId, time)
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId, time)
 
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
           config.numIoThreads)
@@ -403,8 +409,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         while (!shutdownSucceeded && remainingRetries > 0) {
           remainingRetries = remainingRetries - 1
 
-          import NetworkClientBlockingOps._
-
           // 1. Find the controller and establish a connection to it.
 
           // Get the current controller info. This is to ensure we use the most recent info to issue the
@@ -431,14 +435,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           if (prevController != null) {
             try {
 
-              if (!networkClient.blockingReady(node(prevController), socketTimeoutMs)(time))
+              if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs))
                 throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
               // send the controlled shutdown request
               val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId)
               val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
                 time.milliseconds(), true)
-              val clientResponse = networkClient.blockingSendAndReceive(request)(time)
+              val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)
 
               val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
               if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining.isEmpty) {
@@ -633,36 +637,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
 
-  private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
-    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
-    val defaultLogConfig = LogConfig(defaultProps)
-
-    val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
-      topic -> LogConfig.fromProps(defaultProps, configs)
-    }
-    // read the log configurations from zookeeper
-    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
-                                      dedupeBufferSize = config.logCleanerDedupeBufferSize,
-                                      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
-                                      ioBufferSize = config.logCleanerIoBufferSize,
-                                      maxMessageSize = config.messageMaxBytes,
-                                      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
-                                      backOffMs = config.logCleanerBackoffMs,
-                                      enableCleaner = config.logCleanerEnable)
-    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
-                   topicConfigs = configs,
-                   defaultConfig = defaultLogConfig,
-                   cleanerConfig = cleanerConfig,
-                   ioThreads = config.numRecoveryThreadsPerDataDir,
-                   flushCheckMs = config.logFlushSchedulerIntervalMs,
-                   flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
-                   flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
-                   retentionCheckMs = config.logCleanupIntervalMs,
-                   scheduler = kafkaScheduler,
-                   brokerState = brokerState,
-                   time = time)
-  }
-
   /**
     * Generates new brokerId if enabled or reads from meta.properties based on following conditions
     * <ol>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5f055a6..cce59ce 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -27,7 +27,7 @@ import kafka.api.{FetchRequest => _, _}
 import kafka.common.KafkaStorageException
 import ReplicaFetcherThread._
 import kafka.utils.Exit
-import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
@@ -248,14 +248,13 @@ class ReplicaFetcherThread(name: String,
   }
 
   private def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse = {
-    import kafka.utils.NetworkClientBlockingOps._
     try {
-      if (!networkClient.blockingReady(sourceNode, socketTimeout)(time))
+      if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
         throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
       else {
         val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder,
           time.milliseconds(), true)
-        networkClient.blockingSendAndReceive(clientRequest)(time)
+        NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
deleted file mode 100644
index 0370564..0000000
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.io.IOException
-
-import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient}
-import org.apache.kafka.common.Node
-import org.apache.kafka.common.requests.AbstractRequest
-import org.apache.kafka.common.utils.Time
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-
-object NetworkClientBlockingOps {
-  implicit def networkClientBlockingOps(client: NetworkClient): NetworkClientBlockingOps =
-    new NetworkClientBlockingOps(client)
-}
-
-/**
- * Provides extension methods for `NetworkClient` that are useful for implementing blocking behaviour. Use with care.
- *
- * Example usage:
- *
- * {{{
- * val networkClient: NetworkClient = ...
- * import NetworkClientBlockingOps._
- * networkClient.blockingReady(...)
- * }}}
- */
-class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
-
-  /**
-    * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending
-    * disconnects have been processed.
-    *
-    * This method can be used to check the status of a connection prior to calling `blockingReady` to be able
-    * to tell whether the latter completed a new connection.
-    */
-  def isReady(node: Node)(implicit time: Time): Boolean = {
-    val currentTime = time.milliseconds()
-    client.poll(0, currentTime)
-    client.isReady(node, currentTime)
-  }
-
-  /**
-   * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
-   * invocations until the connection to `node` is ready, the timeout expires or the connection fails.
-   *
-   * It returns `true` if the call completes normally or `false` if the timeout expires. If the connection fails,
-   * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
-   * connection timeout, it is possible for this method to raise an `IOException` for a previous connection which
-   * has recently disconnected.
-   *
-   * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
-   * care.
-   */
-  def blockingReady(node: Node, timeout: Long)(implicit time: Time): Boolean = {
-    require(timeout >=0, "timeout should be >= 0")
-
-    val startTime = time.milliseconds()
-    val expiryTime = startTime + timeout
-
-    @tailrec
-    def awaitReady(iterationStartTime: Long): Boolean = {
-      if (client.isReady(node, iterationStartTime))
-        true
-      else if (client.connectionFailed(node))
-        throw new IOException(s"Connection to $node failed")
-      else {
-        val pollTimeout = expiryTime - iterationStartTime
-        client.poll(pollTimeout, iterationStartTime)
-        val afterPollTime = time.milliseconds()
-        if (afterPollTime < expiryTime) awaitReady(afterPollTime)
-        else false
-      }
-    }
-
-    isReady(node) || client.ready(node, startTime) || awaitReady(startTime)
-  }
-
-  /**
-   * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a
-   * disconnection happens (which can happen for a number of reasons including a request timeout).
-   *
-   * In case of a disconnection, an `IOException` is thrown.
-   *
-   * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
-   * care.
-   */
-  def blockingSendAndReceive(request: ClientRequest)(implicit time: Time): ClientResponse = {
-    client.send(request, time.milliseconds())
-    pollContinuously { responses =>
-      val response = responses.find { response =>
-        response.requestHeader.correlationId == request.correlationId
-      }
-      response.foreach { r =>
-        if (r.wasDisconnected)
-          throw new IOException(s"Connection to ${request.destination} was disconnected before the response was read")
-        else if (r.versionMismatch() != null)
-          throw r.versionMismatch();
-      }
-      response
-    }
-  }
-
-  /**
-    * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned.
-    *
-    * Exceptions thrown via `collect` are not handled and will bubble up.
-    *
-    * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
-    * care.
-    */
-  private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: Time): T = {
-
-    @tailrec
-    def recursivePoll: T = {
-      // rely on request timeout to ensure we don't block forever
-      val responses = client.poll(Long.MaxValue, time.milliseconds()).asScala
-      collect(responses) match {
-        case Some(result) => result
-        case None => recursivePoll
-      }
-    }
-
-    recursivePoll
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index aa55479..6ff5c5f 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -31,9 +31,6 @@ import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMars
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -65,6 +62,7 @@ object ZkUtils {
   val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
+  val PidBlockPath = "/latest_pid_block"
 
 
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
@@ -76,7 +74,8 @@ object ZkUtils {
                               ControllerEpochPath,
                               IsrChangeNotificationPath,
                               KafkaAclPath,
-                              KafkaAclChangesPath)
+                              KafkaAclChangesPath,
+                              PidBlockPath)
 
   def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
     val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
@@ -217,7 +216,8 @@ class ZkUtils(val zkClient: ZkClient,
                               getEntityConfigRootPath(ConfigType.Client),
                               DeleteTopicsPath,
                               BrokerSequenceIdPath,
-                              IsrChangeNotificationPath)
+                              IsrChangeNotificationPath,
+                              PidBlockPath)
 
   val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
 
@@ -529,12 +529,12 @@ class ZkUtils(val zkClient: ZkClient,
           case Some(checker) => checker(this, path, data)
           case _ =>
             debug("Checker method is not passed skipping zkData match")
-            warn("Conditional update of path %s with data %s and expected version %d failed due to %s"
+            debug("Conditional update of path %s with data %s and expected version %d failed due to %s"
               .format(path, data,expectVersion, e1.getMessage))
             (false, -1)
         }
       case e2: Exception =>
-        warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
+        debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
           expectVersion, e2.getMessage))
         (false, -1)
     }
@@ -624,6 +624,20 @@ class ZkUtils(val zkClient: ZkClient,
     dataAndStat
   }
 
+  def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
+    val stat = new Stat()
+    try {
+      val data: String = zkClient.readData(path, stat)
+      if (data == null.asInstanceOf[String])
+        (None, stat.getVersion)
+      else
+      (Some(data), stat.getVersion)
+    } catch {
+      case _: ZkNoNodeException =>
+        (None, stat.getVersion)
+    }
+  }
+
   def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
 
   def getChildrenParentMayNotExist(path: String): Seq[String] = {
@@ -719,6 +733,14 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
+  def getTopicPartitionCount(topic: String): Option[Int] = {
+    val topicData = getPartitionAssignmentForTopics(Seq(topic))
+    if (topicData(topic).nonEmpty)
+      Some(topicData(topic).size)
+    else
+      None
+  }
+
   def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = {
     // read the partitions and their new replica list
     val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 852377c..5aeeefe 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -14,6 +14,7 @@
 package kafka.api
 
 import java.util.Properties
+import java.util.concurrent.Future
 
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
@@ -24,11 +25,13 @@ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
+import scala.collection.mutable.ArrayBuffer
+
 class ProducerBounceTest extends KafkaServerTestHarness {
-  private val producerBufferSize = 30000
+  private val producerBufferSize =  65536
   private val serverMessageMaxBytes =  producerBufferSize/2
 
-  val numServers = 2
+  val numServers = 4
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
@@ -36,7 +39,9 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
   // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
-
+  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+  overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
   // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
   // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
   // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
@@ -47,31 +52,19 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
   // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
   override def generateConfigs() = {
-    FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = false)
+    FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
-  private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
-
   private val topic1 = "topic-1"
 
   @Before
   override def setUp() {
     super.setUp()
-
-    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, bufferSize = producerBufferSize)
-    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, bufferSize = producerBufferSize)
-    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, bufferSize = producerBufferSize)
   }
 
   @After
   override def tearDown() {
-    if (producer1 != null) producer1.close
-    if (producer2 != null) producer2.close
-    if (producer3 != null) producer3.close
-
     super.tearDown()
   }
 
@@ -81,19 +74,25 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   @Test
   def testBrokerFailure() {
     val numPartitions = 3
-    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers)
+    val topicConfig = new Properties();
+    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
+
     assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
 
     val scheduler = new ProducerScheduler()
     scheduler.start
 
     // rolling bounce brokers
+
     for (_ <- 0 until numServers) {
       for (server <- servers) {
+        info("Shutting down server : %s".format(server.config.brokerId))
         server.shutdown()
         server.awaitShutdown()
+        info("Server %s shut down. Starting it up again.".format(server.config.brokerId))
         server.startup()
-        Thread.sleep(2000)
+        info("Restarted server: %s".format(server.config.brokerId))
       }
 
       // Make sure the producer do not see any exception in returned metadata due to broker failures
@@ -121,8 +120,9 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message))
     val uniqueMessages = messages.toSet
     val uniqueMessageSize = uniqueMessages.size
-
-    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
+    info(s"number of unique messages sent: ${uniqueMessageSize}")
+    assertEquals(s"Found ${messages.size - uniqueMessageSize} duplicate messages.", uniqueMessageSize, messages.size)
+    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, messages.size)
   }
 
   private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
@@ -130,26 +130,51 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     var sent = 0
     var failed = false
 
-    val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10)
+    val producerConfig = new Properties()
+    producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    val producerConfigWithCompression = new Properties()
+    producerConfigWithCompression.putAll(producerConfig)
+    producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
+    val producers = List(
+      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries = 10, props = Some(producerConfig)),
+      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 2, retries = 10, lingerMs = 5000, props = Some(producerConfig)),
+      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10, lingerMs = 10000, props = Some(producerConfigWithCompression))
+    )
 
     override def doWork(): Unit = {
-      val responses =
-        for (i <- sent+1 to sent+numRecords)
-        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes),
-                            new ErrorLoggingCallback(topic1, null, null, true))
-      val futures = responses.toList
+      info("Starting to send messages..")
+      var producerId = 0
+      val responses = new ArrayBuffer[IndexedSeq[Future[RecordMetadata]]]()
+      for (producer <- producers) {
+        val response =
+          for (i <- sent+1 to sent+numRecords)
+            yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, ((producerId + 1) * i).toString.getBytes),
+              new ErrorLoggingCallback(topic1, null, null, true))
+        responses.append(response)
+        producerId += 1
+      }
 
       try {
-        futures.map(_.get)
-        sent += numRecords
+        for (response <- responses) {
+          val futures = response.toList
+          futures.map(_.get)
+          sent += numRecords
+        }
+        info(s"Sent $sent records")
       } catch {
-        case _ : Exception => failed = true
+        case e : Exception =>
+          error(s"Got exception ${e.getMessage}")
+          e.printStackTrace()
+          failed = true
       }
     }
 
     override def shutdown(){
       super.shutdown()
-      producer.close
+      for (producer <- producers) {
+        producer.close()
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 8a198eb..61199c2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -79,14 +79,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
     props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
 
-    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
-
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
+    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
+    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2))
     EasyMock.replay(zkUtils)
 
     timer = new MockTimer

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 6b1abf3..9d38485 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -69,11 +69,8 @@ class GroupMetadataManagerTest {
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
 
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
-    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
-
     zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret)
+    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2))
     EasyMock.replay(zkUtils)
 
     time = new MockTime

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
new file mode 100644
index 0000000..da9ec47
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
@@ -0,0 +1,105 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.coordinator
+
+import kafka.common.KafkaException
+import kafka.utils.ZkUtils
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.{After, Test}
+import org.junit.Assert._
+
+class ProducerIdManagerTest {
+
+  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+
+  @After
+  def tearDown(): Unit = {
+    EasyMock.reset(zkUtils)
+  }
+
+  @Test
+  def testGetPID() {
+    var zkVersion: Int = -1
+    var data: String = null
+    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
+      .andAnswer(new IAnswer[(Option[String], Int)] {
+        override def answer(): (Option[String], Int) = {
+          if (zkVersion == -1) {
+            (None.asInstanceOf[Option[String]], 0)
+          } else {
+            (Some(data), zkVersion)
+          }
+        }
+      })
+      .anyTimes()
+
+    val capturedVersion: Capture[Int] = EasyMock.newCapture()
+    val capturedData: Capture[String] = EasyMock.newCapture()
+    EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
+      EasyMock.capture(capturedData),
+      EasyMock.capture(capturedVersion),
+      EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]]))
+      .andAnswer(new IAnswer[(Boolean, Int)] {
+        override def answer(): (Boolean, Int) = {
+          zkVersion = capturedVersion.getValue + 1
+          data = capturedData.getValue
+
+          (true, zkVersion)
+        }
+      })
+      .anyTimes()
+
+    EasyMock.replay(zkUtils)
+
+    val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
+    val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
+
+    val pid1 = manager1.nextPid()
+    val pid2 = manager2.nextPid()
+
+    assertEquals(0, pid1)
+    assertEquals(ProducerIdManager.PidBlockSize, pid2)
+
+    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
+      assertEquals(pid1 + i, manager1.nextPid())
+    }
+
+    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
+      assertEquals(pid2 + i, manager2.nextPid())
+    }
+
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid())
+  }
+
+  @Test(expected = classOf[KafkaException])
+  def testExceedPIDLimit() {
+    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
+      .andAnswer(new IAnswer[(Option[String], Int)] {
+        override def answer(): (Option[String], Int) = {
+          (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0,
+            Long.MaxValue - ProducerIdManager.PidBlockSize,
+            Long.MaxValue))), 0)
+        }
+      })
+      .anyTimes()
+    EasyMock.replay(zkUtils)
+    new ProducerIdManager(0, zkUtils)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
new file mode 100644
index 0000000..f8ef5dc
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
@@ -0,0 +1,93 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.coordinator
+
+import kafka.utils.ZkUtils
+import org.apache.kafka.common.protocol.Errors
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+class TransactionCoordinatorTest {
+
+  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+
+  var zkVersion: Int = -1
+  var data: String = null
+  val capturedVersion: Capture[Int] = EasyMock.newCapture()
+  val capturedData: Capture[String] = EasyMock.newCapture()
+  EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
+    .andAnswer(new IAnswer[(Option[String], Int)] {
+      override def answer(): (Option[String], Int) = {
+        if (zkVersion == -1) {
+          (None.asInstanceOf[Option[String]], 0)
+        } else {
+          (Some(data), zkVersion)
+        }
+      }
+    })
+    .anyTimes()
+
+  EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
+    EasyMock.capture(capturedData),
+    EasyMock.capture(capturedVersion),
+    EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]]))
+    .andAnswer(new IAnswer[(Boolean, Int)] {
+      override def answer(): (Boolean, Int) = {
+        zkVersion = capturedVersion.getValue + 1
+        data = capturedData.getValue
+
+        (true, zkVersion)
+      }
+    })
+    .anyTimes()
+
+  EasyMock.replay(zkUtils)
+
+  val pidManager: ProducerIdManager = new ProducerIdManager(0, zkUtils)
+  val coordinator: TransactionCoordinator = new TransactionCoordinator(0, pidManager)
+
+  var result: InitPidResult = null
+
+  @Before
+  def setUp(): Unit = {
+    coordinator.startup()
+  }
+
+  @After
+  def tearDown(): Unit = {
+    EasyMock.reset(zkUtils)
+    coordinator.shutdown()
+  }
+
+  @Test
+  def testHandleInitPid() = {
+    coordinator.handleInitPid("", initPidMockCallback)
+    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
+
+    coordinator.handleInitPid("", initPidMockCallback)
+    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+
+    coordinator.handleInitPid(null, initPidMockCallback)
+    assertEquals(InitPidResult(2L, 0, Errors.NONE), result)
+  }
+
+  def initPidMockCallback(ret: InitPidResult): Unit = {
+    result = ret
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 5f97708..49faa85 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -53,7 +53,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     val logProps = new Properties()
     logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
     /* append two messages */
     log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 3e91f96..2104842 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -322,7 +322,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       val dir = new File(logDir, "log-" + i)
       dir.mkdirs()
 
-      val log = new Log(dir = dir,
+      val log = new Log(dir,
                         LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)),
                         logStartOffset = 0L,
                         recoveryPoint = 0L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 05d9060..2cfcc07 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -149,7 +149,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
       logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
       logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
 
-      val log = new Log(dir = dir,
+      val log = new Log(dir,
         LogConfig(logProps),
         logStartOffset = 0L,
         recoveryPoint = 0L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 94207ec..e933c87 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -185,8 +185,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      time.scheduler,
-      time)
+      scheduler = time.scheduler,
+      time = time)
     log
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 38eb94c..928b03d 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -174,6 +174,27 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testLogCleanerRetainsLastWrittenRecordForEachPid(): Unit = {
+    val cleaner = makeCleaner(10)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    log.append(record(0, 0)) // offset 0
+    log.append(record(0, 1, pid = 1, epoch = 0, sequence = 0)) // offset 1
+    log.append(record(0, 2, pid = 2, epoch = 0, sequence = 0)) // offset 2
+    log.append(record(0, 3, pid = 3, epoch = 0, sequence = 0)) // offset 3
+    log.append(record(1, 1, pid = 2, epoch = 0, sequence = 1)) // offset 4
+
+    // roll the segment, so we can clean the messages already appended
+    log.roll()
+
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
+    assertEquals(immutable.List(0, 0, 1), keysInLog(log))
+    assertEquals(immutable.List(1, 3, 4), offsetsInLog(log))
+  }
+
+  @Test
   def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 2 messages in the map
     val cleaner = makeCleaner(2)
@@ -796,8 +817,12 @@ class LogCleanerTest extends JUnitSuite {
 
   def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
 
-  def record(key: Int, value: Int): MemoryRecords =
-    record(key, value.toString.getBytes)
+
+  def record(key: Int, value: Int, pid: Long = RecordBatch.NO_PRODUCER_ID, epoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
+             sequence: Int = RecordBatch.NO_SEQUENCE): MemoryRecords = {
+    MemoryRecords.withRecords(0L, CompressionType.NONE, pid, epoch, sequence,
+      new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
+  }
 
   def record(key: Int, value: Array[Byte]) =
     TestUtils.singletonRecords(key = key.toString.getBytes, value = value)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a8e953a..1400615 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -102,7 +102,7 @@ class LogManagerTest {
     time.sleep(maxLogAgeMs + 1)
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes)
 
     try {
@@ -148,7 +148,7 @@ class LogManagerTest {
     time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes)
     try {
       log.read(0, 1024)


Mime
View raw message