kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [18/30] git commit: KAFKA-642 Remove the response version and baseline all versions/magic bytes etc to 0.
Date Tue, 18 Dec 2012 17:44:12 GMT
KAFKA-642 Remove the response version and baseline all versions/magic bytes etc to 0.


git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1418181 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f5a48b06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f5a48b06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f5a48b06

Branch: refs/heads/trunk
Commit: f5a48b06e31ebe925dc99abfb6518f6057871318
Parents: a4c7675
Author: Edward Jay Kreps <jkreps@apache.org>
Authored: Fri Dec 7 04:01:29 2012 +0000
Committer: Edward Jay Kreps <jkreps@apache.org>
Committed: Fri Dec 7 04:01:29 2012 +0000

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala   |    2 +-
 core/src/main/scala/kafka/api/FetchResponse.scala  |    8 +---
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    2 +-
 .../scala/kafka/api/LeaderAndIsrResponse.scala     |    8 +---
 core/src/main/scala/kafka/api/OffsetRequest.scala  |    2 +-
 core/src/main/scala/kafka/api/OffsetResponse.scala |    8 +---
 .../src/main/scala/kafka/api/ProducerRequest.scala |    2 +-
 .../main/scala/kafka/api/ProducerResponse.scala    |    9 +---
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    2 +-
 .../main/scala/kafka/api/StopReplicaResponse.scala |    8 +---
 .../scala/kafka/api/TopicMetadataRequest.scala     |    2 +-
 .../scala/kafka/api/TopicMetadataResponse.scala    |    9 +---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   30 +++++++-------
 .../api/RequestResponseSerializationTest.scala     |   12 +++---
 .../unit/kafka/producer/AsyncProducerTest.scala    |    4 +-
 15 files changed, 42 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 4ed071a..b4fb874 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -30,7 +30,7 @@ case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 
 
 object FetchRequest {
-  val CurrentVersion = 1.shortValue()
+  val CurrentVersion = 0.shortValue
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
   val ReplicaFetcherClientId = "replica-fetcher"

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 0f989fe..94650f1 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -136,12 +136,10 @@ class TopicDataSend(val topicData: TopicData) extends Send {
 object FetchResponse {
 
   val headerSize =
-    2 + /* versionId */
     4 + /* correlationId */
     4 /* topic count */
 
   def readFrom(buffer: ByteBuffer): FetchResponse = {
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
@@ -151,13 +149,12 @@ object FetchResponse {
           (TopicAndPartition(topicData.topic, partitionId), partitionData)
       }
     })
-    FetchResponse(versionId, correlationId, Map(pairs:_*))
+    FetchResponse(correlationId, Map(pairs:_*))
   }
 }
 
 
-case class FetchResponse(versionId: Short,
-                         correlationId: Int,
+case class FetchResponse(correlationId: Int,
                          data: Map[TopicAndPartition, FetchResponsePartitionData])  {
 
   /**
@@ -206,7 +203,6 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send
{
 
   private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
   buffer.putInt(size)
-  buffer.putShort(fetchResponse.versionId)
   buffer.putInt(fetchResponse.correlationId)
   buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
   buffer.rewind()

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 7459f4a..9759949 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -79,7 +79,7 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
 }
 
 object LeaderAndIsrRequest {
-  val CurrentVersion = 1.shortValue()
+  val CurrentVersion = 0.shortValue
   val DefaultClientId = ""
   val IsInit: Boolean = true
   val NotInit: Boolean = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index c8f1630..dbd85d0 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -26,7 +26,6 @@ import collection.Map
 
 object LeaderAndIsrResponse {
   def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val errorCode = buffer.getShort
     val numEntries = buffer.getInt
@@ -37,19 +36,17 @@ object LeaderAndIsrResponse {
       val partitionErrorCode = buffer.getShort
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new LeaderAndIsrResponse(versionId, correlationId, responseMap, errorCode)
+    new LeaderAndIsrResponse(correlationId, responseMap, errorCode)
   }
 }
 
 
-case class LeaderAndIsrResponse(versionId: Short,
-                                correlationId: Int,
+case class LeaderAndIsrResponse(correlationId: Int,
                                 responseMap: Map[(String, Int), Short],
                                 errorCode: Short = ErrorMapping.NoError)
         extends RequestOrResponse {
   def sizeInBytes(): Int ={
     var size =
-      2 /* version id */ +
       4 /* correlation id */ + 
       2 /* error code */ +
       4 /* number of responses */
@@ -63,7 +60,6 @@ case class LeaderAndIsrResponse(versionId: Short,
   }
 
   def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 5239538..6c522bc 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -23,7 +23,7 @@ import kafka.api.ApiUtils._
 
 
 object OffsetRequest {
-  val CurrentVersion = 1.shortValue()
+  val CurrentVersion = 0.shortValue
   val DefaultClientId = ""
 
   val SmallestTimeString = "smallest"

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 7818b66..264e200 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -25,7 +25,6 @@ import kafka.api.ApiUtils._
 object OffsetResponse {
 
   def readFrom(buffer: ByteBuffer): OffsetResponse = {
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val numTopics = buffer.getInt
     val pairs = (1 to numTopics).flatMap(_ => {
@@ -39,7 +38,7 @@ object OffsetResponse {
         (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
       })
     })
-    OffsetResponse(versionId, correlationId, Map(pairs:_*))
+    OffsetResponse(correlationId, Map(pairs:_*))
   }
 
 }
@@ -48,8 +47,7 @@ object OffsetResponse {
 case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
 
 
-case class OffsetResponse(versionId: Short,
-                          correlationId: Int,
+case class OffsetResponse(correlationId: Int,
                           partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
         extends RequestOrResponse {
 
@@ -58,7 +56,6 @@ case class OffsetResponse(versionId: Short,
   def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError)
 
   val sizeInBytes = {
-    2 + /* versionId */
     4 + /* correlation id */
     4 + /* topic count */
     offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
@@ -77,7 +74,6 @@ case class OffsetResponse(versionId: Short,
   }
 
   def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
     buffer.putInt(correlationId)
     buffer.putInt(offsetsGroupedByTopic.size) // topic count
     offsetsGroupedByTopic.foreach {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 87700a0..9edc4dd 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -25,7 +25,7 @@ import kafka.api.ApiUtils._
 
 
 object ProducerRequest {
-  val CurrentVersion: Short = 0
+  val CurrentVersion = 0.shortValue
 
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 62c9bc4..743227d 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -25,7 +25,6 @@ import kafka.api.ApiUtils._
 
 object ProducerResponse {
   def readFrom(buffer: ByteBuffer): ProducerResponse = {
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val topicCount = buffer.getInt
     val statusPairs = (1 to topicCount).flatMap(_ => {
@@ -39,15 +38,14 @@ object ProducerResponse {
       })
     })
 
-    ProducerResponse(versionId, correlationId, Map(statusPairs:_*))
+    ProducerResponse(correlationId, Map(statusPairs:_*))
   }
 }
 
 case class ProducerResponseStatus(error: Short, offset: Long)
 
 
-case class ProducerResponse(versionId: Short,
-                            correlationId: Int,
+case class ProducerResponse(correlationId: Int,
                             status: Map[TopicAndPartition, ProducerResponseStatus]) extends
RequestOrResponse {
 
   /**
@@ -59,7 +57,6 @@ case class ProducerResponse(versionId: Short,
 
   val sizeInBytes = {
     val groupedStatus = statusGroupedByTopic
-    2 + /* version id */
     4 + /* correlation id */
     4 + /* topic count */
     groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
@@ -76,8 +73,6 @@ case class ProducerResponse(versionId: Short,
 
   def writeTo(buffer: ByteBuffer) {
     val groupedStatus = statusGroupedByTopic
-
-    buffer.putShort(versionId)
     buffer.putInt(correlationId)
     buffer.putInt(groupedStatus.size) // topic count
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 6583d64..deb195f 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -25,7 +25,7 @@ import kafka.network.InvalidRequestException
 
 
 object StopReplicaRequest extends Logging {
-  val CurrentVersion = 1.shortValue()
+  val CurrentVersion = 0.shortValue
   val DefaultClientId = ""
   val DefaultAckTimeout = 100
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index 6062a0c..fa66b99 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -26,7 +26,6 @@ import kafka.api.ApiUtils._
 
 object StopReplicaResponse {
   def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val errorCode = buffer.getShort
     val numEntries = buffer.getInt
@@ -38,18 +37,16 @@ object StopReplicaResponse {
       val partitionErrorCode = buffer.getShort()
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new StopReplicaResponse(versionId, correlationId, responseMap.toMap, errorCode)
+    new StopReplicaResponse(correlationId, responseMap.toMap, errorCode)
   }
 }
 
 
-case class StopReplicaResponse(val versionId: Short,
-                               val correlationId: Int,
+case class StopReplicaResponse(val correlationId: Int,
                                val responseMap: Map[(String, Int), Short],
                                val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
   def sizeInBytes(): Int ={
     var size =
-      2 /* version id */ +
       4 /* correlation id */ + 
       2 /* error code */ +
       4 /* number of responses */
@@ -63,7 +60,6 @@ case class StopReplicaResponse(val versionId: Short,
   }
 
   def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 0a99779..5bdb2c1 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -23,7 +23,7 @@ import collection.mutable.ListBuffer
 import kafka.utils.Logging
 
 object TopicMetadataRequest extends Logging {
-  val CurrentVersion = 1.shortValue()
+  val CurrentVersion = 0.shortValue
   val DefaultClientId = ""
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index 1bf4cc4..af76776 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -23,27 +23,24 @@ import java.nio.ByteBuffer
 object TopicMetadataResponse {
 
   def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val brokerCount = buffer.getInt
     val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer))
     val brokerMap = brokers.map(b => (b.id, b)).toMap
     val topicCount = buffer.getInt
     val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer,
brokerMap))
-    new TopicMetadataResponse(versionId, topicsMetadata, correlationId)
+    new TopicMetadataResponse(topicsMetadata, correlationId)
   }
 }
 
-case class TopicMetadataResponse(versionId: Short,
-                                 topicsMetadata: Seq[TopicMetadata],
+case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
                                  correlationId: Int) extends RequestOrResponse {
   val sizeInBytes: Int = {
     val brokers = extractBrokers(topicsMetadata).values
-    2 + 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
+    4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
   }
 
   def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
     buffer.putInt(correlationId)
     /* brokers */
     val brokers = extractBrokers(topicsMetadata).values

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d7a5736..ac90b20 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -70,7 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case (topicAndPartition, data) =>
                 (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1l))
             }
-            val errorResponse = ProducerResponse(apiRequest.versionId, apiRequest.correlationId,
producerResponseStatus)
+            val errorResponse = ProducerResponse(apiRequest.correlationId, producerResponseStatus)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.FetchKey =>
@@ -79,7 +79,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case (topicAndPartition, data) =>
                 (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1, null))
             }
-            val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId,
fetchResponsePartitionData)
+            val errorResponse = FetchResponse(apiRequest.correlationId, fetchResponsePartitionData)
             requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.OffsetsKey =>
@@ -88,7 +88,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case (topicAndPartition, partitionOffsetRequest) =>
                 (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
null))
             }
-            val errorResponse = OffsetResponse(apiRequest.versionId, apiRequest.correlationId,
partitionOffsetResponseMap)
+            val errorResponse = OffsetResponse(apiRequest.correlationId, partitionOffsetResponseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.MetadataKey =>
@@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val topicMeatadata = apiRequest.topics.map {
               topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
             }
-            val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata,
apiRequest.correlationId)
+            val errorResponse = TopicMetadataResponse(topicMeatadata, apiRequest.correlationId)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.LeaderAndIsrKey =>
@@ -104,7 +104,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val responseMap = apiRequest.partitionStateInfos.map {
               case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
             }
-            val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, apiRequest.correlationId,
responseMap)
+            val errorResponse = LeaderAndIsrResponse(apiRequest.correlationId, responseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.StopReplicaKey =>
@@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
             }.toMap
             error("error when handling request %s".format(apiRequest), e)
-            val errorResponse = StopReplicaResponse(apiRequest.versionId, apiRequest.correlationId,
responseMap)
+            val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
         }
     } finally
@@ -127,7 +127,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Handling leader and ISR request " + leaderAndIsrRequest)
     try {
       val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId,
leaderAndIsrRequest.correlationId, response, error)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId,
response, error)
       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
@@ -144,7 +144,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Handling stop replica request " + stopReplicaRequest)
 
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
-    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, stopReplicaRequest.correlationId,
response.toMap, error)
+    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap,
error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
 
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
@@ -161,7 +161,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
       val topicData = readMessageSets(fetchReq.fetch)
-      val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId,
topicData)
+      val response = FetchResponse(fetchReq.fetch.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
     }
   }
@@ -192,7 +192,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         allPartitionHaveReplicationFactorOne ||
         numPartitionsInError == produceRequest.numPartitions) {
       val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode,
r.start)).toMap
-      val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId,
statuses)
+      val response = ProducerResponse(produceRequest.correlationId, statuses)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     } else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
@@ -292,7 +292,7 @@ class KafkaApis(val requestChannel: RequestChannel,
        bytesReadable >= fetchRequest.minBytes ||
        fetchRequest.numPartitions <= 0) {
       debug("Returning fetch response %s for fetch request with correlation id %d".format(dataRead.values.map(_.error).mkString(","),
fetchRequest.correlationId))
-      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId,
dataRead)
+      val response = new FetchResponse(fetchRequest.correlationId, dataRead)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
       debug("Putting fetch request into purgatory")
@@ -408,7 +408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
       }
     })
-    val response = OffsetResponse(OffsetRequest.CurrentVersion, offsetRequest.correlationId,
responseMap)
+    val response = OffsetResponse(offsetRequest.correlationId, responseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -457,7 +457,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       })
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
-    val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq,
metadataRequest.correlationId)
+    val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -514,7 +514,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       debug("Expiring fetch request %s.".format(delayed.fetch))
       try {
         val topicData = readMessageSets(delayed.fetch)
-        val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId,
topicData)
+        val response = FetchResponse(delayed.fetch.correlationId, topicData)
         val fromFollower = delayed.fetch.isFromFollower
         delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
         requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
@@ -564,7 +564,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
         })
       
-      val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets)
+      val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets)
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 391e724..509b020 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -95,7 +95,7 @@ object SerializationTestUtils{
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
     val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
                           ((topic2, 0), ErrorMapping.NoError))
-    new LeaderAndIsrResponse(1, 1, responseMap)
+    new LeaderAndIsrResponse(1, responseMap)
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
@@ -105,7 +105,7 @@ object SerializationTestUtils{
   def createTestStopReplicaResponse() : StopReplicaResponse = {
     val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
                           ((topic2, 0), ErrorMapping.NoError))
-    new StopReplicaResponse(1, 0, responseMap.toMap)
+    new StopReplicaResponse(0, responseMap.toMap)
   }
 
   def createTestProducerRequest: ProducerRequest = {
@@ -113,7 +113,7 @@ object SerializationTestUtils{
   }
 
   def createTestProducerResponse: ProducerResponse =
-    ProducerResponse(1, 1, Map(
+    ProducerResponse(1, Map(
       TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
       TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
     ))
@@ -123,7 +123,7 @@ object SerializationTestUtils{
   }
 
   def createTestFetchResponse: FetchResponse = {
-    FetchResponse(1, 1, topicDataFetchResponse)
+    FetchResponse(1, topicDataFetchResponse)
   }
 
   def createTestOffsetRequest = new OffsetRequest(
@@ -132,7 +132,7 @@ object SerializationTestUtils{
   )
 
   def createTestOffsetResponse: OffsetResponse = {
-    new OffsetResponse(OffsetRequest.CurrentVersion, 0, collection.immutable.Map(
+    new OffsetResponse(0, collection.immutable.Map(
       TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l,
2000l, 3000l, 4000l)))
     )
   }
@@ -142,7 +142,7 @@ object SerializationTestUtils{
   }
 
   def createTestTopicMetadataResponse: TopicMetadataResponse = {
-    new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2), 1)
+    new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f5a48b06/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 534df19..90a7ed8 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -429,11 +429,11 @@ class AsyncProducerTest extends JUnit3Suite {
     // On the third try for partition 0, let it succeed.
     val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs),
acks = 0, correlationId = 0)
     val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs),
acks = 0, correlationId = 1)
-    val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
+    val response1 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort,
0L)),
           (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId
= 2)
-    val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
+    val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate
SocketTimeoutException


Mime
View raw message