kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] kafka git commit: MINOR: Rename InitPidRequest/InitPidResponse to InitProducerIdRequest/InitProducerIdResponse
Date Sat, 13 May 2017 03:02:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1cb39f757 -> a1c8e7d94


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 4b2cedb..d7b1c33 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -73,28 +73,28 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry:
Produc
   private var coordinatorEpoch = initialEntry.coordinatorEpoch
   private val transactions = ListBuffer.empty[TxnMetadata]
 
-  def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) =
-    this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
+  def this(producerId: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean)
=
+    this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
 
-  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers:
Boolean) = {
-    if (this.producerEpoch > epoch) {
+  private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers:
Boolean) = {
+    if (this.producerEpoch > producerEpoch) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably
another producer " +
-        s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} (server epoch)")
+        s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server
epoch)")
     } else if (shouldValidateSequenceNumbers) {
-      if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch <
epoch) {
+      if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch <
producerEpoch) {
         if (firstSeq != 0)
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch:
$epoch " +
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch:
$producerEpoch " +
             s"(request epoch), $firstSeq (seq. number)")
       } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
         // the epoch was bumped by a control record, so we expect the sequence number to
be reset
-        throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId
(pid), found $firstSeq " +
+        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: found $firstSeq " +
           s"(incoming seq. number), but expected 0")
       } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
-        throw new DuplicateSequenceNumberException(s"Duplicate sequence number: pid: $producerId,
(incomingBatch.firstSeq, " +
+        throw new DuplicateSequenceNumberException(s"Duplicate sequence number for producerId
$producerId: (incomingBatch.firstSeq, " +
           s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq):
" +
           s"(${this.firstSeq}, ${this.lastSeq}).")
       } else if (firstSeq != this.lastSeq + 1L) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId
(pid), $firstSeq " +
+        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId
$producerId: $firstSeq " +
           s"(incoming seq. number), ${this.lastSeq} (current end sequence number)")
       }
     }
@@ -202,25 +202,25 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry:
Produc
 }
 
 object ProducerStateManager {
-  private val PidSnapshotVersion: Short = 1
+  private val ProducerSnapshotVersion: Short = 1
   private val VersionField = "version"
   private val CrcField = "crc"
-  private val PidField = "pid"
+  private val ProducerIdField = "producer_id"
   private val LastSequenceField = "last_sequence"
   private val ProducerEpochField = "epoch"
   private val LastOffsetField = "last_offset"
   private val OffsetDeltaField = "offset_delta"
   private val TimestampField = "timestamp"
-  private val PidEntriesField = "pid_entries"
+  private val ProducerEntriesField = "producer_entries"
   private val CoordinatorEpochField = "coordinator_epoch"
   private val CurrentTxnFirstOffsetField = "current_txn_first_offset"
 
   private val VersionOffset = 0
   private val CrcOffset = VersionOffset + 2
-  private val PidEntriesOffset = CrcOffset + 4
+  private val ProducerEntriesOffset = CrcOffset + 4
 
-  val PidSnapshotEntrySchema = new Schema(
-    new Field(PidField, Type.INT64, "The producer ID"),
+  val ProducerSnapshotEntrySchema = new Schema(
+    new Field(ProducerIdField, Type.INT64, "The producer ID"),
     new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"),
     new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"),
     new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"),
@@ -231,33 +231,33 @@ object ProducerStateManager {
   val PidSnapshotMapSchema = new Schema(
     new Field(VersionField, Type.INT16, "Version of the snapshot file"),
     new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
-    new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID
table"))
+    new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries
in the producer table"))
 
   def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
     val buffer = Files.readAllBytes(file.toPath)
     val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
 
     val version = struct.getShort(VersionField)
-    if (version != PidSnapshotVersion)
+    if (version != ProducerSnapshotVersion)
       throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
 
     val crc = struct.getUnsignedInt(CrcField)
-    val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset)
+    val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset)
     if (crc != computedCrc)
       throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no
longer valid). " +
         s"Stored crc: $crc. Computed crc: $computedCrc")
 
-    struct.getArray(PidEntriesField).map { pidEntryObj =>
-      val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
-      val pid: Long = pidEntryStruct.getLong(PidField)
-      val epoch = pidEntryStruct.getShort(ProducerEpochField)
-      val seq = pidEntryStruct.getInt(LastSequenceField)
-      val offset = pidEntryStruct.getLong(LastOffsetField)
-      val timestamp = pidEntryStruct.getLong(TimestampField)
-      val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField)
-      val coordinatorEpoch = pidEntryStruct.getInt(CoordinatorEpochField)
-      val currentTxnFirstOffset = pidEntryStruct.getLong(CurrentTxnFirstOffsetField)
-      val newEntry = ProducerIdEntry(pid, epoch, seq, offset, offsetDelta, timestamp,
+    struct.getArray(ProducerEntriesField).map { producerEntryObj =>
+      val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
+      val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+      val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
+      val seq = producerEntryStruct.getInt(LastSequenceField)
+      val offset = producerEntryStruct.getLong(LastOffsetField)
+      val timestamp = producerEntryStruct.getLong(TimestampField)
+      val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
+      val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
+      val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
+      val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta,
timestamp,
         coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset)
else None)
       newEntry
     }
@@ -265,12 +265,12 @@ object ProducerStateManager {
 
   private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) {
     val struct = new Struct(PidSnapshotMapSchema)
-    struct.set(VersionField, PidSnapshotVersion)
+    struct.set(VersionField, ProducerSnapshotVersion)
     struct.set(CrcField, 0L) // we'll fill this after writing the entries
     val entriesArray = entries.map {
-      case (pid, entry) =>
-        val pidEntryStruct = struct.instance(PidEntriesField)
-        pidEntryStruct.set(PidField, pid)
+      case (producerId, entry) =>
+        val producerEntryStruct = struct.instance(ProducerEntriesField)
+        producerEntryStruct.set(ProducerIdField, producerId)
           .set(ProducerEpochField, entry.producerEpoch)
           .set(LastSequenceField, entry.lastSeq)
           .set(LastOffsetField, entry.lastOffset)
@@ -278,16 +278,16 @@ object ProducerStateManager {
           .set(TimestampField, entry.timestamp)
           .set(CoordinatorEpochField, entry.coordinatorEpoch)
           .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
-        pidEntryStruct
+        producerEntryStruct
     }.toArray
-    struct.set(PidEntriesField, entriesArray)
+    struct.set(ProducerEntriesField, entriesArray)
 
     val buffer = ByteBuffer.allocate(struct.sizeOf)
     struct.writeTo(buffer)
     buffer.flip()
 
     // now fill in the CRC
-    val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset)
+    val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - ProducerEntriesOffset)
     ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
 
     val fos = new FileOutputStream(file)
@@ -404,10 +404,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
   // visible for testing
   private[log] def loadProducerEntry(entry: ProducerIdEntry): Unit = {
-    val pid = entry.producerId
-    producers.put(pid, entry)
+    val producerId = entry.producerId
+    producers.put(producerId, entry)
     entry.currentTxnFirstOffset.foreach { offset =>
-      ongoingTxns.put(offset, new TxnMetadata(pid, offset))
+      ongoingTxns.put(offset, new TxnMetadata(producerId, offset))
     }
   }
 
@@ -418,7 +418,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
    * Expire any PIDs which have been idle longer than the configured maximum expiration timeout.
    */
   def removeExpiredProducers(currentTimeMs: Long) {
-    producers.retain { case (pid, lastEntry) =>
+    producers.retain { case (producerId, lastEntry) =>
       !isExpired(currentTimeMs, lastEntry)
     }
   }
@@ -496,7 +496,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
 
   /**
    * When we remove the head of the log due to retention, we need to clean up the id map.
This method takes
-   * the new start offset and removes all pids which have a smaller last written offset.
+   * the new start offset and removes all producerIds which have a smaller last written offset.
    */
   def evictUnretainedProducers(logStartOffset: Long) {
     val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index aaa2458..1f8bea5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,7 +32,7 @@ import kafka.common.Topic.{GroupMetadataTopicName, TransactionStateTopicName,
is
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
-import kafka.coordinator.transaction.{InitPidResult, TransactionCoordinator}
+import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
 import kafka.network.RequestChannel.{Response, Session}
@@ -110,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
-        case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
+        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
         case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
         case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
@@ -1386,20 +1386,20 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleInitPidRequest(request: RequestChannel.Request): Unit = {
-    val initPidRequest = request.body[InitPidRequest]
-    val transactionalId = initPidRequest.transactionalId
+  def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
+    val initProducerIdRequest = request.body[InitProducerIdRequest]
+    val transactionalId = initProducerIdRequest.transactionalId
 
     // Send response callback
-    def sendResponseCallback(result: InitPidResult): Unit = {
+    def sendResponseCallback(result: InitProducerIdResult): Unit = {
       def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error,
result.pid, result.epoch)
-        trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result
from client ${request.header.clientId}.")
+        val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId,
result.producerEpoch)
+        trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from
client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
     }
-    txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback)
+    txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs,
sendResponseCallback)
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
@@ -1408,7 +1408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(error: Errors) {
       def createResponse(throttleTimeMs: Int): AbstractResponse = {
         val responseBody = new EndTxnResponse(throttleTimeMs, error)
-        trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command:
${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.")
+        trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command:
${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
@@ -1433,23 +1433,22 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    def sendResponseCallback(pid: Long, result: TransactionResult)(responseStatus: Map[TopicPartition,
PartitionResponse]): Unit = {
-      errors.put(pid, responseStatus.mapValues(_.error).asJava)
+    def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus:
Map[TopicPartition, PartitionResponse]): Unit = {
+      errors.put(producerId, responseStatus.mapValues(_.error).asJava)
 
       val successfulPartitions = responseStatus.filter { case (_, partitionResponse) =>
         partitionResponse.error == Errors.NONE
       }.keys.toSeq
 
       try {
-        groupCoordinator.handleTxnCompletion(producerId = pid, topicPartitions = successfulPartitions,
transactionResult = result)
+        groupCoordinator.handleTxnCompletion(producerId = producerId, topicPartitions = successfulPartitions,
transactionResult = result)
       } catch {
         case e: Exception =>
           error(s"Received an exception while trying to update the offsets cache on transaction
completion: $e")
-          val producerIdErrors = errors.get(pid)
+          val producerIdErrors = errors.get(producerId)
           successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN))
       }
 
-
       if (numAppends.decrementAndGet() == 0)
         sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 690d167..5ee4b12 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -590,7 +590,7 @@ object KafkaConfig {
   /** ********* Transaction management configuration ***********/
   val TransactionalIdExpirationMsDoc = "The maximum amount of time in ms that the transaction
coordinator will wait before proactively expire a producer's transactional id without receiving
any transaction status updates from it."
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
-    "If a client’s requested transaction time exceed this, then the broker will return
an error in InitPidRequest. This prevents a client from too large of a timeout, which can
stall consumers reading from topics included in the transaction."
+    "If a client’s requested transaction time exceed this, then the broker will return
an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which
can stall consumers reading from topics included in the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the
transaction topic."
   val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments
when loading pid and transactions into the cache."
   val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction
topic (set higher to ensure availability). " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4e2b11a..c12f774 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -64,7 +64,7 @@ object ZkUtils {
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
   val ConfigUsersPath = s"$ConfigPath/users"
-  val PidBlockPath = "/latest_pid_block"
+  val ProducerIdBlockPath = "/latest_pid_block"
   // Important: it is necessary to add any new top level Zookeeper path to the Seq
   val SecureZkRootPaths = Seq(AdminPath,
                               BrokersPath,
@@ -75,7 +75,7 @@ object ZkUtils {
                               IsrChangeNotificationPath,
                               KafkaAclPath,
                               KafkaAclChangesPath,
-                              PidBlockPath)
+                              ProducerIdBlockPath)
 
   // Important: it is necessary to add any new top level Zookeeper path that contains
   //            sensitive information that should not be world readable to the Seq
@@ -239,7 +239,7 @@ class ZkUtils(val zkClient: ZkClient,
                               DeleteTopicsPath,
                               BrokerSequenceIdPath,
                               IsrChangeNotificationPath,
-                              PidBlockPath)
+                              ProducerIdBlockPath)
 
   // Visible for testing
   val zkPath = new ZkPath(zkClient)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 85c631c..b032f8d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -68,22 +68,22 @@ class ProducerIdManagerTest {
     val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
     val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
 
-    val pid1 = manager1.nextPid()
-    val pid2 = manager2.nextPid()
+    val pid1 = manager1.generateProducerId()
+    val pid2 = manager2.generateProducerId()
 
     assertEquals(0, pid1)
     assertEquals(ProducerIdManager.PidBlockSize, pid2)
 
     for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid1 + i, manager1.nextPid())
+      assertEquals(pid1 + i, manager1.generateProducerId())
     }
 
     for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid2 + i, manager2.nextPid())
+      assertEquals(pid2 + i, manager2.generateProducerId())
     }
 
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid())
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.generateProducerId())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.generateProducerId())
   }
 
   @Test(expected = classOf[KafkaException])
@@ -91,7 +91,7 @@ class ProducerIdManagerTest {
     EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
       .andAnswer(new IAnswer[(Option[String], Int)] {
         override def answer(): (Option[String], Int) = {
-          (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0,
+          (Some(ProducerIdManager.generateProducerIdBlockJson(ProducerIdBlock(0,
             Long.MaxValue - ProducerIdManager.PidBlockSize,
             Long.MaxValue))), 0)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
index df23952..83cba71 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
@@ -47,19 +47,19 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness
{
 
     val tc = servers.head.transactionCoordinator
 
-    var initPidResult: InitPidResult = null
-    def callback(result: InitPidResult): Unit = {
-      initPidResult = result
+    var initProducerIdResult: InitProducerIdResult = null
+    def callback(result: InitProducerIdResult): Unit = {
+      initProducerIdResult = result
     }
 
     val txnId = "txn"
-    tc.handleInitPid(txnId, 900000, callback)
+    tc.handleInitProducerId(txnId, 900000, callback)
 
-    while(initPidResult == null) {
+    while(initProducerIdResult == null) {
       Utils.sleep(1)
     }
 
-    Assert.assertEquals(Errors.NONE, initPidResult.error)
+    Assert.assertEquals(Errors.NONE, initProducerIdResult.error)
 
     @volatile var addPartitionErrors: Errors = null
     def addPartitionsCallback(errors: Errors): Unit = {
@@ -67,8 +67,8 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness
{
     }
 
     tc.handleAddPartitionsToTransaction(txnId,
-      initPidResult.pid,
-      initPidResult.epoch,
+      initProducerIdResult.producerId,
+      initProducerIdResult.producerEpoch,
       Set[TopicPartition](new TopicPartition(topic, 0)),
       addPartitionsCallback
     )

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 395bfb9..2f4f572 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -58,11 +58,11 @@ class TransactionCoordinatorTest {
     txnMarkerPurgatory,
     time)
 
-  var result: InitPidResult = _
+  var result: InitProducerIdResult = _
   var error: Errors = Errors.NONE
 
   private def mockPidManager(): Unit = {
-    EasyMock.expect(pidManager.nextPid())
+    EasyMock.expect(pidManager.generateProducerId())
       .andAnswer(new IAnswer[Long] {
         override def answer(): Long = {
           nextPid += 1
@@ -90,10 +90,10 @@ class TransactionCoordinatorTest {
     mockPidManager()
     EasyMock.replay(pidManager)
 
-    coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
-    coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
   }
 
   @Test
@@ -101,10 +101,10 @@ class TransactionCoordinatorTest {
     mockPidManager()
     EasyMock.replay(pidManager)
 
-    coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
-    coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
   }
 
   @Test
@@ -143,16 +143,16 @@ class TransactionCoordinatorTest {
       .anyTimes()
     EasyMock.replay(pidManager, transactionManager)
 
-    coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(nextPid - 1, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(nextPid - 1, 0, Errors.NONE), result)
   }
 
   @Test
   def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinatorForId(): Unit = {
     mockPidManager()
     EasyMock.replay(pidManager)
-    coordinator.handleInitPid("some-pid", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(-1, -1, Errors.NOT_COORDINATOR), result)
+    coordinator.handleInitProducerId("some-pid", txnTimeoutMs, initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.NOT_COORDINATOR), result)
   }
 
   @Test
@@ -165,7 +165,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
   }
 
   @Test
@@ -299,7 +299,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
     EasyMock.verify(transactionManager)
   }
 
@@ -312,7 +312,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
     EasyMock.verify(transactionManager)
   }
 
@@ -513,9 +513,9 @@ class TransactionCoordinatorTest {
 
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
-    coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
     EasyMock.verify(transactionManager)
   }
 
@@ -568,7 +568,7 @@ class TransactionCoordinatorTest {
 
     EasyMock.expect(transactionManager.transactionsToExpire())
       .andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, pid, epoch)))
-    
+
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
     coordinator.startup(false)
@@ -589,9 +589,9 @@ class TransactionCoordinatorTest {
 
     EasyMock.replay(transactionManager)
 
-    coordinator.handleInitPid(transactionalId, 10, initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, 10, initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
   }
 
   private def validateIncrementEpochAndUpdateMetadata(state: TransactionState) = {
@@ -620,9 +620,9 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     val newTxnTimeoutMs = 10
-    coordinator.handleInitPid(transactionalId, newTxnTimeoutMs, initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, newTxnTimeoutMs, initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(pid, (epoch + 1).toShort, Errors.NONE), result)
+    assertEquals(InitProducerIdResult(pid, (epoch + 1).toShort, Errors.NONE), result)
     assertEquals(newTxnTimeoutMs, metadata.txnTimeoutMs)
     assertEquals(time.milliseconds(), metadata.txnLastUpdateTimestamp)
     assertEquals((epoch + 1).toShort, metadata.producerEpoch)
@@ -704,7 +704,7 @@ class TransactionCoordinatorTest {
     completedMetadata
   }
 
-  def initPidMockCallback(ret: InitPidResult): Unit = {
+  def initProducerIdMockCallback(ret: InitProducerIdResult): Unit = {
     result = ret
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 9270544..425b9f1 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -252,7 +252,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
 
         case ApiKeys.INIT_PRODUCER_ID =>
-          new InitPidRequest.Builder("abc")
+          new InitProducerIdRequest.Builder("abc")
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
           new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
@@ -353,7 +353,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
-      case ApiKeys.INIT_PRODUCER_ID => new InitPidResponse(response).throttleTimeMs
+      case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs
       case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
       case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
       case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs


Mime
View raw message