kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [18/37] kafka-1462; Add new request and response formats for the new consumer and coordinator communication; patched by Jun Rao; reviewed by Guozhang Wang and Jay Kreps
Date Tue, 05 Aug 2014 23:00:15 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 630768a..861a6cf 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Response
 import scala.collection._
 
 object OffsetCommitRequest extends Logging {
-  val CurrentVersion: Short = 0
+  val CurrentVersion: Short = 1
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
@@ -34,11 +34,23 @@ object OffsetCommitRequest extends Logging {
 
     // Read values from the envelope
     val versionId = buffer.getShort
+    assert(versionId == 0 || versionId == 1,
+           "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions
are 0 or 1.")
+
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
 
     // Read the OffsetRequest 
     val consumerGroupId = readShortString(buffer)
+
+    // version 1 specific fields
+    var groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID
+    var consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID
+    if (versionId == 1) {
+      groupGenerationId = buffer.getInt
+      consumerId = readShortString(buffer)
+    }
+
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
       val topic = readShortString(buffer)
@@ -54,16 +66,20 @@ object OffsetCommitRequest extends Logging {
         (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
       })
     })
-    OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId,
clientId)
+    OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId,
clientId, groupGenerationId, consumerId)
   }
 }
 
 case class OffsetCommitRequest(groupId: String,
                                requestInfo: immutable.Map[TopicAndPartition, OffsetAndMetadata],
                                versionId: Short = OffsetCommitRequest.CurrentVersion,
-                               override val correlationId: Int = 0,
-                               clientId: String = OffsetCommitRequest.DefaultClientId)
-    extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) {
+                               correlationId: Int = 0,
+                               clientId: String = OffsetCommitRequest.DefaultClientId,
+                               groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID,
+                               consumerId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID)
+    extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
+  assert(versionId == 0 || versionId == 1,
+         "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are
0 or 1.")
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
@@ -84,7 +100,6 @@ case class OffsetCommitRequest(groupId: String,
     OffsetCommitResponse(commitStatus, correlationId)
   }
 
-
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
     buffer.putShort(versionId)
@@ -93,6 +108,12 @@ case class OffsetCommitRequest(groupId: String,
 
     // Write OffsetCommitRequest
     writeShortString(buffer, groupId)             // consumer group
+
+    // version 1 specific data
+    if (versionId == 1) {
+      buffer.putInt(groupGenerationId)
+      writeShortString(buffer, consumerId)
+    }
     buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
     requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
       writeShortString(buffer, t1._1) // topic
@@ -110,7 +131,8 @@ case class OffsetCommitRequest(groupId: String,
     2 + /* versionId */
     4 + /* correlationId */
     shortStringLength(clientId) +
-    shortStringLength(groupId) + 
+    shortStringLength(groupId) +
+    (if (versionId == 1) 4 /* group generation id */ + shortStringLength(consumerId) else
0) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
       val (topic, offsets) = topicAndOffsets
@@ -139,6 +161,8 @@ case class OffsetCommitRequest(groupId: String,
     offsetCommitRequest.append("; CorrelationId: " + correlationId)
     offsetCommitRequest.append("; ClientId: " + clientId)
     offsetCommitRequest.append("; GroupId: " + groupId)
+    offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId)
+    offsetCommitRequest.append("; ConsumerId: " + consumerId)
     if(details)
       offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetCommitRequest.toString()

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 4946e97..624a1c1 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -42,8 +42,8 @@ object OffsetCommitResponse extends Logging {
 }
 
 case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
-                               override val correlationId: Int = 0)
-    extends RequestOrResponse(correlationId=correlationId) {
+                                correlationId: Int = 0)
+    extends RequestOrResponse() {
 
   lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index a32f858..c7604b9 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -52,9 +52,9 @@ object OffsetFetchRequest extends Logging {
 case class OffsetFetchRequest(groupId: String,
                               requestInfo: Seq[TopicAndPartition],
                               versionId: Short = OffsetFetchRequest.CurrentVersion,
-                              override val correlationId: Int = 0,
+                              correlationId: Int = 0,
                               clientId: String = OffsetFetchRequest.DefaultClientId)
-    extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey), correlationId) {
+    extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index c1222f4..e3523f8 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -45,8 +45,8 @@ object OffsetFetchResponse extends Logging {
 }
 
 case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
-                               override val correlationId: Int = 0)
-    extends RequestOrResponse(correlationId = correlationId) {
+                               correlationId: Int = 0)
+    extends RequestOrResponse() {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 7cbc26c..3d483bc 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -57,10 +57,10 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
 
 case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
                          versionId: Short = OffsetRequest.CurrentVersion,
-                         override val correlationId: Int = 0,
+                         correlationId: Int = 0,
                          clientId: String = OffsetRequest.DefaultClientId,
                          replicaId: Int = Request.OrdinaryConsumerId)
-    extends RequestOrResponse(Some(RequestKeys.OffsetsKey), correlationId) {
+    extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
 
   def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId:
Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId,
replicaId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 0e1d6e3..63c0899 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -51,9 +51,9 @@ case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
 }
 
 
-case class OffsetResponse(override val correlationId: Int,
+case class OffsetResponse(correlationId: Int,
                           partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
-    extends RequestOrResponse(correlationId = correlationId) {
+    extends RequestOrResponse() {
 
   lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 0c295a2..b2366e7 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -53,12 +53,12 @@ object ProducerRequest {
 }
 
 case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
-                           override val correlationId: Int,
+                           correlationId: Int,
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
                            data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
-    extends RequestOrResponse(Some(RequestKeys.ProduceKey), correlationId) {
+    extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 5a1d801..a286272 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -43,9 +43,9 @@ object ProducerResponse {
 
 case class ProducerResponseStatus(var error: Short, offset: Long)
 
-case class ProducerResponse(override val correlationId: Int,
+case class ProducerResponse(correlationId: Int,
                             status: Map[TopicAndPartition, ProducerResponseStatus])
-    extends RequestOrResponse(correlationId = correlationId) {
+    extends RequestOrResponse() {
 
   /**
    * Partitions the status map into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index fbfc9d3..c24c034 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -32,6 +32,8 @@ object RequestKeys {
   val OffsetCommitKey: Short = 8
   val OffsetFetchKey: Short = 9
   val ConsumerMetadataKey: Short = 10
+  val JoinGroupKey: Short = 11
+  val HeartbeatKey: Short = 12
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -44,7 +46,10 @@ object RequestKeys {
         ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
         OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
         OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
-        ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom))
+        ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
+        JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
+        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+    )
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 57f87a4..0334343 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -30,7 +30,7 @@ object Request {
 }
 
 
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val
correlationId: Int) extends Logging {
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends
Logging {
 
   def sizeInBytes: Int
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 68fc138..5e14987 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -54,13 +54,13 @@ object StopReplicaRequest extends Logging {
 }
 
 case class StopReplicaRequest(versionId: Short,
-                              override val correlationId: Int,
+                              correlationId: Int,
                               clientId: String,
                               controllerId: Int,
                               controllerEpoch: Int,
                               deletePartitions: Boolean,
                               partitions: Set[TopicAndPartition])
-        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
+        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
 
   def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int,
controllerEpoch: Int, correlationId: Int) = {
     this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index c90ddee..3431f3f 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -42,10 +42,10 @@ object StopReplicaResponse {
 }
 
 
-case class StopReplicaResponse(override val correlationId: Int,
+case class StopReplicaResponse(val correlationId: Int,
                                val responseMap: Map[TopicAndPartition, Short],
                                val errorCode: Short = ErrorMapping.NoError)
-    extends RequestOrResponse(correlationId = correlationId) {
+    extends RequestOrResponse() {
   def sizeInBytes(): Int ={
     var size =
       4 /* correlation id */ + 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index bce004f..7dca09c 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -47,10 +47,10 @@ object TopicMetadataRequest extends Logging {
 }
 
 case class TopicMetadataRequest(val versionId: Short,
-                                override val correlationId: Int,
+                                val correlationId: Int,
                                 val clientId: String,
                                 val topics: Seq[String])
- extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){
+ extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
   def this(topics: Seq[String], correlationId: Int) =
     this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId,
topics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index b233d35..92ac4e6 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -35,8 +35,8 @@ object TopicMetadataResponse {
 
 case class TopicMetadataResponse(brokers: Seq[Broker],
                                  topicsMetadata: Seq[TopicMetadata],
-                                 override val correlationId: Int)
-    extends RequestOrResponse(correlationId = correlationId) {
+                                 correlationId: Int)
+    extends RequestOrResponse() {
   val sizeInBytes: Int = {
     4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 543e262..530982e 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -55,13 +55,13 @@ object UpdateMetadataRequest {
 }
 
 case class UpdateMetadataRequest (versionId: Short,
-                                  override val correlationId: Int,
+                                  correlationId: Int,
                                   clientId: String,
                                   controllerId: Int,
                                   controllerEpoch: Int,
                                   partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo],
                                   aliveBrokers: Set[Broker])
-  extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey), correlationId) {
+  extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey)) {
 
   def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String,
            partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers:
Set[Broker]) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
index c583c1f..53f6067 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
@@ -32,9 +32,9 @@ object UpdateMetadataResponse {
   }
 }
 
-case class UpdateMetadataResponse(override val correlationId: Int,
+case class UpdateMetadataResponse(correlationId: Int,
                                   errorCode: Short = ErrorMapping.NoError)
-  extends RequestOrResponse(correlationId = correlationId) {
+  extends RequestOrResponse() {
   def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */
 
   def writeTo(buffer: ByteBuffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 8763968..ecbfa0f 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -133,9 +133,9 @@ class RequestSendThread(val controllerId: Int,
             isSendSuccessful = true
           } catch {
             case e: Throwable => // if the send was not successful, reconnect to broker
and resend the message
-              error(("Controller %d epoch %d failed to send %s request with correlation id
%s to broker %s. " +
+              error(("Controller %d epoch %d failed to send request %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
-                RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()),
e)
+                request.toString, toBroker.toString()), e)
               channel.disconnect()
               connectToBroker(toBroker, channel)
               isSendSuccessful = false
@@ -153,8 +153,8 @@ class RequestSendThread(val controllerId: Int,
           case RequestKeys.UpdateMetadataKey =>
             response = UpdateMetadataResponse.readFrom(receive.buffer)
         }
-        stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d
for a request sent to broker %s"
-                                  .format(controllerId, controllerContext.epoch, response.correlationId,
toBroker.toString()))
+        stateChangeLogger.trace("Controller %d epoch %d received response %s for a request
sent to broker %s"
+                                  .format(controllerId, controllerContext.epoch, response.toString,
toBroker.toString))
 
         if(callback != null) {
           callback(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 08dcc55..27fc1eb 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -21,7 +21,6 @@ import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 
 class OffsetCommitRequest(groupId: String,
                           requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
-                          versionId: Short,
                           correlationId: Int,
                           clientId: String) {
   val underlying = {
@@ -33,7 +32,6 @@ class OffsetCommitRequest(groupId: String,
     kafka.api.OffsetCommitRequest(
       groupId = groupId,
       requestInfo = scalaMap,
-      versionId = versionId,
       correlationId = correlationId,
       clientId = clientId
     )

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 7e6da16..b0b7be1 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -24,10 +24,10 @@ import kafka.common.ErrorMapping
 import kafka.network.RequestChannel.Response
 
 class TopicMetadataRequest(val versionId: Short,
-                           override val correlationId: Int,
+                           val correlationId: Int,
                            val clientId: String,
                            val topics: java.util.List[String])
-    extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
+    extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
 
   val underlying: kafka.api.TopicMetadataRequest = {
     import scala.collection.JavaConversions._

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc0e03f7/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index d34ddf5..847a36b 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -23,9 +23,14 @@ import junit.framework.Assert._
 import java.nio.ByteBuffer
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.cluster.Broker
-import kafka.common.{OffsetAndMetadata, TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
 import kafka.utils.SystemTime
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.protocol.ApiKeys
+import scala.Some
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
 
 
 object SerializationTestUtils {
@@ -146,13 +151,23 @@ object SerializationTestUtils {
     new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
   }
 
-  def createTestOffsetCommitRequest: OffsetCommitRequest = {
+  def createTestOffsetCommitRequestV1: OffsetCommitRequest = {
     new OffsetCommitRequest("group 1", collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata",
timestamp=SystemTime.milliseconds),
       TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata,
timestamp=SystemTime.milliseconds)
     ))
   }
 
+  def createTestOffsetCommitRequestV0: OffsetCommitRequest = {
+    new OffsetCommitRequest(
+      versionId = 0,
+      groupId = "group 1",
+      requestInfo = collection.immutable.Map(
+        TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata",
timestamp=SystemTime.milliseconds),
+        TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata,
timestamp=SystemTime.milliseconds)
+      ))
+  }
+
   def createTestOffsetCommitResponse: OffsetCommitResponse = {
     new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) ->
ErrorMapping.NoError,
                                  TopicAndPartition(topic1, 1) -> ErrorMapping.NoError))
@@ -180,6 +195,31 @@ object SerializationTestUtils {
     ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError)
   }
 
+  def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = {
+    val header = new RequestHeader(ApiKeys.HEARTBEAT.id, 0.asInstanceOf[Short], "", 1)
+    val body = new HeartbeatRequest("group1", 1, "consumer1")
+    HeartbeatRequestAndHeader(header, body)
+  }
+
+  def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = {
+    val header = new ResponseHeader(1)
+    val body = new HeartbeatResponse(0.asInstanceOf[Short])
+    HeartbeatResponseAndHeader(header, body)
+  }
+
+  def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = {
+    import scala.collection.JavaConversions._
+    val header = new RequestHeader(ApiKeys.JOIN_GROUP.id, 0.asInstanceOf[Short], "", 1)
+    val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1");
+    JoinGroupRequestAndHeader(header, body)
+  }
+
+  def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = {
+    import scala.collection.JavaConversions._
+    val header = new ResponseHeader(1)
+    val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11",
1)))
+    JoinGroupResponseAndHeader(header, body)
+  }
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -194,27 +234,31 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetResponse = SerializationTestUtils.createTestOffsetResponse
   private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest
   private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse
-  private val offsetCommitRequest = SerializationTestUtils.createTestOffsetCommitRequest
+  private val offsetCommitRequestV0 = SerializationTestUtils.createTestOffsetCommitRequestV0
+  private val offsetCommitRequestV1 = SerializationTestUtils.createTestOffsetCommitRequestV1
   private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse
   private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
   private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
+  private val heartbeatRequest = SerializationTestUtils.createHeartbeatRequestAndHeader
+  private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader
+  private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader
+  private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader
 
   @Test
   def testSerializationAndDeserialization() {
 
     val requestsAndResponses =
-      collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse,
-                               stopReplicaRequest, stopReplicaResponse,
-                               producerRequest, producerResponse,
-                               fetchRequest,
-                               offsetRequest, offsetResponse,
-                               topicMetadataRequest, topicMetadataResponse,
-                               offsetCommitRequest, offsetCommitResponse,
-                               offsetFetchRequest, offsetFetchResponse,
-                               consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator)
+      collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse, stopReplicaRequest,
+                               stopReplicaResponse, producerRequest, producerResponse,
+                               fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest,
+                               topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1,
+                               offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
+                               consumerMetadataRequest, consumerMetadataResponse,
+                               consumerMetadataResponseNoCoordinator, heartbeatRequest,
+                               heartbeatResponse, joinGroupRequest, joinGroupResponse)
 
     requestsAndResponses.foreach { original =>
       val buffer = ByteBuffer.allocate(original.sizeInBytes)
@@ -222,7 +266,9 @@ class RequestResponseSerializationTest extends JUnitSuite {
       buffer.rewind()
       val deserializer = original.getClass.getDeclaredMethod("readFrom", classOf[ByteBuffer])
       val deserialized = deserializer.invoke(null, buffer)
-      assertEquals("The original and deserialized request/response should be the same.",
original, deserialized)
+      assertFalse("All serialized bytes in " + original.getClass.getSimpleName + " should
have been consumed",
+                  buffer.hasRemaining)
+      assertEquals("The original and deserialized for " + original.getClass.getSimpleName
+ " should be the same.", original, deserialized)
     }
   }
 }


Mime
View raw message