kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1417977 - in /kafka/branches/0.8/core/src/main/scala/kafka: api/FetchResponse.scala api/TopicMetadataResponse.scala server/KafkaApis.scala
Date Thu, 06 Dec 2012 16:23:13 GMT
Author: jkreps
Date: Thu Dec  6 16:23:12 2012
New Revision: 1417977

URL: http://svn.apache.org/viewvc?rev=1417977&view=rev
Log:
KAFKA-642 Addressing Jun's follow up comments--(1) add parans to make statement more clear,
(2) remove the initial offset from the fetch response since the message set itself now contains
all offsets.


Modified:
    kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
    kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala

Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Thu Dec  6 16:23:12
2012
@@ -27,29 +27,25 @@ import kafka.api.ApiUtils._
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
     val error = buffer.getShort
-    val initialOffset = buffer.getLong
     val hw = buffer.getLong
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new FetchResponsePartitionData(error, initialOffset,
-                                   hw, new ByteBufferMessageSet(messageSetBuffer))
+    new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))
   }
 
   val headerSize =
     2 + /* error code */
-    8 + /* initialOffset */
     8 + /* high watermark */
     4 /* messageSetSize */
 }
 
-case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
-                                      initialOffset:Long = 0L, hw: Long = -1L, messages:
MessageSet) {
+case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L,
messages: MessageSet) {
 
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
 
-  def this(messages: MessageSet) = this(ErrorMapping.NoError, 0L, -1L, messages)
+  def this(messages: MessageSet) = this(ErrorMapping.NoError, -1L, messages)
   
 }
 
@@ -63,7 +59,6 @@ class PartitionDataSend(val partitionId:
   private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
   buffer.putInt(partitionId)
   buffer.putShort(partitionData.error)
-  buffer.putLong(partitionData.initialOffset)
   buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes)
   buffer.rewind()

Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala Thu Dec 
6 16:23:12 2012
@@ -56,7 +56,7 @@ case class TopicMetadataResponse(version
     
   def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
     val parts = topicsMetadata.flatMap(_.partitionsMetadata)
-    val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) =>
l}
+    val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l)
=> l})
     brokers.map(b => (b.id, b)).toMap
   }
 }

Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Dec  6 16:23:12
2012
@@ -77,7 +77,7 @@ class KafkaApis(val requestChannel: Requ
             val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
             val fetchResponsePartitionData = apiRequest.requestInfo.map {
               case (topicAndPartition, data) =>
-                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
0, -1, null))
+                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1, null))
             }
             val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId,
fetchResponsePartitionData)
             requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
@@ -326,19 +326,18 @@ class KafkaApis(val requestChannel: Requ
             BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
             BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
             if (!isFetchFromFollower) {
-              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark,
messages)
+              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             } else {
               debug("Leader %d for topic %s partition %d received fetch request from follower
%d"
                             .format(brokerId, topic, partition, fetchRequest.replicaId))
-              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark,
messages)
+              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             }
           } catch {
             case t: Throwable =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize),
t)
-              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-                                             offset, -1L, MessageSet.Empty)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
           }
         (TopicAndPartition(topic, partition), partitionData)
     }



Mime
View raw message