kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1522; Transactional messaging request/response definitions; reviewed by Joel Koshy
Date Fri, 15 Aug 2014 18:15:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/transactional_messaging 7a67a7226 -> 45c4de00d


KAFKA-1522; Transactional messaging request/response definitions; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45c4de00
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45c4de00
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45c4de00

Branch: refs/heads/transactional_messaging
Commit: 45c4de00daa0c20cb7718e851f65e46074b0c247
Parents: 7a67a72
Author: Dong Lin <lindong28@gmail.com>
Authored: Fri Aug 15 11:08:55 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Aug 15 11:08:55 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/RequestKeys.scala |   6 +-
 .../scala/kafka/api/TransactionRequest.scala    | 184 +++++++++++++++++++
 .../scala/kafka/api/TransactionResponse.scala   |  97 ++++++++++
 .../api/TxCoordinatorMetadataRequest.scala      |  79 ++++++++
 .../api/TxCoordinatorMetadataResponse.scala     |  59 ++++++
 .../main/scala/kafka/common/ErrorMapping.scala  |   1 +
 .../api/RequestResponseSerializationTest.scala  |  31 +++-
 7 files changed, 455 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c24c034..4a9b174 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -34,6 +34,8 @@ object RequestKeys {
   val ConsumerMetadataKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
+  val TransactionKey: Short = 13
+  val TxCoordinatorMetadataKey: Short = 14
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -48,7 +50,9 @@ object RequestKeys {
         OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
         ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
         JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
-        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom),
+        TransactionKey -> ("TransactionRequest", TransactionRequest.readFrom),
+        TxCoordinatorMetadataKey -> ("TxCoordinatorMetadata", TxCoordinatorMetadataRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TransactionRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TransactionRequest.scala b/core/src/main/scala/kafka/api/TransactionRequest.scala
new file mode 100644
index 0000000..29df42c
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TransactionRequest.scala
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio._
+import kafka.api.ApiUtils._
+import kafka.common._
+import kafka.network.RequestChannel.Response
+import kafka.network.{RequestChannel, BoundedByteBufferSend}
+import collection.mutable.{LinkedHashMap, LinkedHashSet}
+
+
+object TransactionRequest {
+  val CurrentVersion = 0.shortValue
+
+  def readFrom(buffer: ByteBuffer): TransactionRequest = {
+    val versionId: Short = buffer.getShort
+    val correlationId: Int = buffer.getInt
+    val clientId: String = readShortString(buffer)
+    val ackTimeoutMs: Int = buffer.getInt
+    val requestInfo = TransactionRequestInfo.readFrom(buffer)
+
+    TransactionRequest(versionId, correlationId, clientId, ackTimeoutMs, requestInfo)
+  }
+
+  def transactionRequestWithNewControl(oldTxRequest: TransactionRequest, newTxControl: Short):
TransactionRequest = {
+    oldTxRequest.copy(requestInfo = oldTxRequest.requestInfo.copy(txControl = newTxControl))
+  }
+}
+
+object TxRequestTypes {
+  val Ongoing: Short = 0
+  val Begin: Short = 1
+  val PreCommit: Short = 2
+  val Commit: Short = 3
+  val Committed: Short = 4
+  val PreAbort: Short = 5
+  val Abort: Short = 6
+  val Aborted: Short = 7
+}
+
+
+case class TransactionRequest(versionId: Short = TransactionRequest.CurrentVersion,
+                              correlationId: Int,
+                              clientId: String,
+                              ackTimeoutMs: Int,
+                              requestInfo: TransactionRequestInfo)
+    extends RequestOrResponse(Some(RequestKeys.TransactionKey)) {
+
+  def this(correlationId: Int,
+           clientId: String,
+           ackTimeoutMs: Int,
+           requestInfo: TransactionRequestInfo) =
+    this(TransactionRequest.CurrentVersion, correlationId, clientId, ackTimeoutMs, requestInfo)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+    buffer.putInt(ackTimeoutMs)
+    requestInfo.writeTo(buffer)
+  }
+
+  def sizeInBytes: Int = {
+    2 + /* versionId */
+    4 + /* correlationId */
+    shortStringLength(clientId) + /* client id */
+    4 + /* ackTimeoutMs */
+    requestInfo.sizeInBytes
+  }
+
+  override def toString(): String = {
+    describe(true)
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+
+    val transactionResponseStatus = requestInfo.txPartitions.map {
+      topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    val errorResponse = TransactionResponse(correlationId, requestInfo.txId, transactionResponseStatus.toMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  def responseFor(status: Map[TopicAndPartition, Short]) = {
+    TransactionResponse(correlationId, requestInfo.txId, status);
+  }
+
+  override def describe(details: Boolean): String = {
+    val transactionRequest = new StringBuilder
+    transactionRequest.append("Name: " + this.getClass.getSimpleName)
+    transactionRequest.append("; Version: " + versionId)
+    transactionRequest.append("; CorrelationId: " + correlationId)
+    transactionRequest.append("; ClientId: " + clientId)
+    transactionRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+    if (details)
+      transactionRequest.append("; requestInfos: " + requestInfo)
+    transactionRequest.toString()
+  }
+}
+
+object TransactionRequestInfo {
+
+  def readFrom(buffer: ByteBuffer): TransactionRequestInfo = {
+    val txGroupId: String = readShortString(buffer)
+    val txId: Int = buffer.getInt
+    val txControl: Short = buffer.getShort
+    val txTimeoutMs: Int = buffer.getInt
+
+    val topicCount = buffer.getInt
+    val txPartitions = (1 to topicCount).flatMap(_ => {
+      val topic = readShortString(buffer)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partition = buffer.getInt
+        TopicAndPartition(topic, partition)
+      })
+    }).toList
+
+    TransactionRequestInfo(txGroupId, txId, txControl, txTimeoutMs, txPartitions)
+  }
+}
+
+case class TransactionRequestInfo(txGroupId: String, txId: Int, txControl: Short, txTimeoutMs:
Int,
+                                  txPartitions: Seq[TopicAndPartition]) {
+
+  private lazy val partitionsGroupedByTopic = txPartitions.groupBy(_.topic)
+
+  def sizeInBytes: Int = {
+    shortStringLength(txGroupId) + /* groupId */
+    4 + /* txId */
+    2 + /* txControl */
+    4 + /* txTimeoutMs */
+    4 + /* number of topics */
+    partitionsGroupedByTopic.foldLeft(0)((foldedTopics, topicAndPartitions) => {
+      foldedTopics +
+      shortStringLength(topicAndPartitions._1) + /* topic */
+      4 + /* number of partitions */
+      4 * topicAndPartitions._2.size /* partitions */
+    })
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    writeShortString(buffer, txGroupId)
+    buffer.putInt(txId)
+    buffer.putShort(txControl)
+    buffer.putInt(txTimeoutMs)
+    buffer.putInt(partitionsGroupedByTopic.size)
+    partitionsGroupedByTopic.foreach {
+      case (topic, topicAndPartitions) =>
+        writeShortString(buffer, topic) //write the topic
+        buffer.putInt(topicAndPartitions.size) //the number of partitions
+        topicAndPartitions.foreach {topicAndPartition: TopicAndPartition =>
+          buffer.putInt(topicAndPartition.partition)
+        }
+    }
+  }
+
+  override def toString(): String = {
+    val requestInfo = new StringBuilder
+    requestInfo.append("gId: " + txGroupId)
+    requestInfo.append("; txId: " + txId)
+    requestInfo.append("; txControl: " + txControl)
+    requestInfo.append("; txTimeoutMs: " + txTimeoutMs)
+    requestInfo.append("; TopicAndPartition: (" + txPartitions.mkString(",") + ")")
+    requestInfo.toString()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TransactionResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TransactionResponse.scala b/core/src/main/scala/kafka/api/TransactionResponse.scala
new file mode 100644
index 0000000..2a889c7
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TransactionResponse.scala
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import kafka.common.{TopicAndPartition, ErrorMapping}
+
+import scala.collection.Map
+
+object TransactionResponse {
+  def readFrom(buffer: ByteBuffer): TransactionResponse = {
+
+    val correlationId = buffer.getInt
+    val txId = buffer.getInt
+    val topicCount = buffer.getInt
+    val statusPairs = (1 to topicCount).flatMap(_ => {
+      val topic = readShortString(buffer)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partition = buffer.getInt
+        val error = buffer.getShort
+        (TopicAndPartition(topic, partition), error)
+      })
+    })
+    TransactionResponse(correlationId, txId,  Map(statusPairs:_*))
+  }
+}
+
+
+case class TransactionResponse(correlationId: Int,
+                               txId: Int,
+                               status: Map[TopicAndPartition, Short])
+        extends RequestOrResponse() {
+
+  private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
+
+  def hasError = status.values.exists(_ != ErrorMapping.NoError)
+
+  val sizeInBytes = {
+    val groupedStatus = statusGroupedByTopic
+    4 + /* correlation id */
+    4 + /* transaction id */
+    4 + /* topic count */
+    groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
+      foldedTopics +
+      shortStringLength(currTopic._1) +
+      4 + /* partition count for this topic */
+      currTopic._2.size * {
+        4 + /* partition id */
+        2 /* error code */
+      }
+    })
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    val groupedStatus = statusGroupedByTopic
+    buffer.putInt(correlationId)
+    buffer.putInt(txId)
+    buffer.putInt(groupedStatus.size) // topic count
+
+    groupedStatus.foreach(topicStatus => {
+      val (topic, errors) = topicStatus
+      writeShortString(buffer, topic)
+      buffer.putInt(errors.size) // partition count
+      errors.foreach {
+        case (TopicAndPartition(_, partition), error) =>
+          buffer.putInt(partition)
+          buffer.putShort(error)
+      }
+    })
+  }
+
+  override def toString(): String = {
+    val requestInfo = new StringBuilder
+    requestInfo.append("txId: " + txId)
+    requestInfo.append("; status: " + status)
+    requestInfo.toString()
+  }
+
+  override def describe(details: Boolean):String = { toString }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala
new file mode 100644
index 0000000..a154c57
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataRequest.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+import kafka.common.ErrorMapping
+
+object TxCoordinatorMetadataRequest {
+  val CurrentVersion = 0.shortValue
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer) = {
+    // envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = ApiUtils.readShortString(buffer)
+
+    // request
+    val txGroupId = ApiUtils.readShortString(buffer)
+    TxCoordinatorMetadataRequest(txGroupId, versionId, correlationId, clientId)
+  }
+
+}
+
+case class TxCoordinatorMetadataRequest(txGroupId: String,
+                                      versionId: Short = TxCoordinatorMetadataRequest.CurrentVersion,
+                                      correlationId: Int = 0,
+                                      clientId: String = TxCoordinatorMetadataRequest.DefaultClientId)
+  extends RequestOrResponse(Some(RequestKeys.TxCoordinatorMetadataKey)) {
+
+  def sizeInBytes =
+    2 + /* versionId */
+    4 + /* correlationId */
+    ApiUtils.shortStringLength(clientId) +
+    ApiUtils.shortStringLength(txGroupId)
+
+  def writeTo(buffer: ByteBuffer) {
+    // envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    ApiUtils.writeShortString(buffer, clientId)
+
+    // transaction coordinator metadata request
+    ApiUtils.writeShortString(buffer, txGroupId)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    // return TransactionCoordinatorNotAvailable for all uncaught errors
+    val errorResponse = TxCoordinatorMetadataResponse(None, ErrorMapping.TxCoordinatorNotAvailableCode)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  def describe(details: Boolean) = {
+    val transactionMetadataRequest = new StringBuilder
+    transactionMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    transactionMetadataRequest.append("; Version: " + versionId)
+    transactionMetadataRequest.append("; CorrelationId: " + correlationId)
+    transactionMetadataRequest.append("; ClientId: " + clientId)
+    transactionMetadataRequest.append("; Group: " + txGroupId)
+    transactionMetadataRequest.toString()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala
new file mode 100644
index 0000000..052b29b
--- /dev/null
+++ b/core/src/main/scala/kafka/api/TxCoordinatorMetadataResponse.scala
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+
+object TxCoordinatorMetadataResponse {
+  val CurrentVersion = 0
+
+  private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
+
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+      Some(Broker.readFrom(buffer))
+    else
+      None
+
+    TxCoordinatorMetadataResponse(coordinatorOpt, errorCode, correlationId)
+  }
+
+}
+
+case class TxCoordinatorMetadataResponse(coordinatorOpt: Option[Broker],
+                                         errorCode: Short,
+                                         correlationId: Int = 0)
+        extends RequestOrResponse() {
+
+  def sizeInBytes =
+    4 + /* correlationId */
+    2 + /* error code */
+    coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).get.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    coordinatorOpt.orElse(TxCoordinatorMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer))
+  }
+
+  def describe(details: Boolean) = toString
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 5559d26..b185421 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -46,6 +46,7 @@ object ErrorMapping {
   val OffsetsLoadInProgressCode: Short = 14
   val ConsumerCoordinatorNotAvailableCode: Short = 15
   val NotCoordinatorForConsumerCode: Short = 16
+  val TxCoordinatorNotAvailableCode: Short = 17
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c4de00/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..e3bb7a6 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -216,6 +217,27 @@ object SerializationTestUtils {
     val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11",
1)))
     JoinGroupResponseAndHeader(1, body)
   }
+
+  def createTestTransactionRequest: TransactionRequest = {
+    new TransactionRequest(1, "client 1", 1000,
+      TransactionRequestInfo("group 1", 1, 1, 1000, Seq(
+        TopicAndPartition(topic1, 0), TopicAndPartition(topic2, 0)
+      )))
+  }
+
+  def createTestTransactionResponse: TransactionResponse = {
+    val responseMap = Map((TopicAndPartition(topic1, 0), ErrorMapping.NoError),
+                          (TopicAndPartition(topic2, 0), ErrorMapping.NoError))
+    TransactionResponse(1, 1,responseMap)
+  }
+
+  def createTestTxCoordinatorMetadataRequest: TxCoordinatorMetadataRequest = {
+    TxCoordinatorMetadataRequest("txGroup 1", clientId = "client 1")
+  }
+
+  def createTestTxCoordinatorMetadataResponse: TxCoordinatorMetadataResponse = {
+    TxCoordinatorMetadataResponse(Some(brokers.head), ErrorMapping.NoError)
+  }
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -242,6 +264,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader
   private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader
   private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader
+  private val transactionRequest = SerializationTestUtils.createTestTransactionRequest
+  private val transactionResponse = SerializationTestUtils.createTestTransactionResponse
+  private val txCoordinatorMetadataRequest = SerializationTestUtils.createTestTxCoordinatorMetadataRequest
+  private val txCoordinatorMetadataResponse = SerializationTestUtils.createTestTxCoordinatorMetadataResponse
 
   @Test
   def testSerializationAndDeserialization() {
@@ -254,7 +280,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
                                offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,
                                consumerMetadataResponseNoCoordinator, heartbeatRequest,
-                               heartbeatResponse, joinGroupRequest, joinGroupResponse)
+                               heartbeatResponse, joinGroupRequest, joinGroupResponse,
+                               transactionRequest, transactionResponse,
+                               txCoordinatorMetadataRequest, txCoordinatorMetadataResponse)
+
 
     requestsAndResponses.foreach { original =>
       val buffer = ByteBuffer.allocate(original.sizeInBytes)


Mime
View raw message