kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [20/30] git commit: KAFKA-642 Addressing Jun's follow up comments--(1) add parans to make statement more clear, (2) remove the initial offset from the fetch response since the message set itself now contains all offsets.
Date Tue, 18 Dec 2012 17:44:12 GMT
KAFKA-642 Addressing Jun's follow up comments--(1) add parans to make statement more clear,
(2) remove the initial offset from the fetch response since the message set itself now contains
all offsets.



git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1417977 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98042e91
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98042e91
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98042e91

Branch: refs/heads/trunk
Commit: 98042e912392ec01173a4fcc5413a72577ee7576
Parents: e114476
Author: Edward Jay Kreps <jkreps@apache.org>
Authored: Thu Dec 6 16:23:12 2012 +0000
Committer: Edward Jay Kreps <jkreps@apache.org>
Committed: Thu Dec 6 16:23:12 2012 +0000

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchResponse.scala  |   11 +++--------
 .../scala/kafka/api/TopicMetadataResponse.scala    |    2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |    9 ++++-----
 3 files changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/98042e91/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 24253c7..0f989fe 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -27,29 +27,25 @@ import kafka.api.ApiUtils._
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
     val error = buffer.getShort
-    val initialOffset = buffer.getLong
     val hw = buffer.getLong
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new FetchResponsePartitionData(error, initialOffset,
-                                   hw, new ByteBufferMessageSet(messageSetBuffer))
+    new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))
   }
 
   val headerSize =
     2 + /* error code */
-    8 + /* initialOffset */
     8 + /* high watermark */
     4 /* messageSetSize */
 }
 
-case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
-                                      initialOffset:Long = 0L, hw: Long = -1L, messages:
MessageSet) {
+case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L,
messages: MessageSet) {
 
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
 
-  def this(messages: MessageSet) = this(ErrorMapping.NoError, 0L, -1L, messages)
+  def this(messages: MessageSet) = this(ErrorMapping.NoError, -1L, messages)
   
 }
 
@@ -63,7 +59,6 @@ class PartitionDataSend(val partitionId: Int,
   private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
   buffer.putInt(partitionId)
   buffer.putShort(partitionData.error)
-  buffer.putLong(partitionData.initialOffset)
   buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes)
   buffer.rewind()

http://git-wip-us.apache.org/repos/asf/kafka/blob/98042e91/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index 0631201..1bf4cc4 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -56,7 +56,7 @@ case class TopicMetadataResponse(versionId: Short,
     
   def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
     val parts = topicsMetadata.flatMap(_.partitionsMetadata)
-    val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) =>
l}
+    val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l)
=> l})
     brokers.map(b => (b.id, b)).toMap
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/98042e91/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6c01025..d7a5736 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -77,7 +77,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
             val fetchResponsePartitionData = apiRequest.requestInfo.map {
               case (topicAndPartition, data) =>
-                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
0, -1, null))
+                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1, null))
             }
             val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId,
fetchResponsePartitionData)
             requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
@@ -326,19 +326,18 @@ class KafkaApis(val requestChannel: RequestChannel,
             BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
             BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
             if (!isFetchFromFollower) {
-              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark,
messages)
+              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             } else {
               debug("Leader %d for topic %s partition %d received fetch request from follower
%d"
                             .format(brokerId, topic, partition, fetchRequest.replicaId))
-              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark,
messages)
+              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             }
           } catch {
             case t: Throwable =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize),
t)
-              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-                                             offset, -1L, MessageSet.Empty)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
           }
         (TopicAndPartition(topic, partition), partitionData)
     }


Mime
View raw message