kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1397744 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/consumer/ main/scala/kafka/network/ main/scala/kafka/producer/async/ main/scala/kafka/server/ test/scala/unit/kafka/api/ test/scala/unit/kafka/netw...
Date Sat, 13 Oct 2012 00:21:46 GMT
Author: nehanarkhede
Date: Sat Oct 13 00:21:45 2012
New Revision: 1397744

URL: http://svn.apache.org/viewvc?rev=1397744&view=rev
Log:
KAFKA-548 remove partition from ProducerRequestPartitionData and FetchResponsePartitionData;
patched by Yang Ye; reviewed by Neha and Joel

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Sat Oct
13 00:21:45 2012
@@ -26,7 +26,6 @@ import kafka.utils.Utils
 
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
-    val partition = buffer.getInt
     val error = buffer.getShort
     val initialOffset = buffer.getLong
     val hw = buffer.getLong
@@ -34,33 +33,35 @@ object FetchResponsePartitionData {
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new FetchResponsePartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer))
+    new FetchResponsePartitionData(error, initialOffset,
+                                   hw, new ByteBufferMessageSet(messageSetBuffer))
   }
 
   val headerSize =
-    4 + /* partition */
     2 + /* error code */
     8 + /* initialOffset */
     8 + /* high watermark */
     4 /* messageSetSize */
 }
 
-case class FetchResponsePartitionData(partition: Int, error: Short = ErrorMapping.NoError,
initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
+case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
+                                      initialOffset:Long = 0L, hw: Long = -1L, messages:
MessageSet) {
 
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes.intValue()
 
-  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError,
0L, -1L, messages)
+  def this(messages: MessageSet) = this(ErrorMapping.NoError, 0L, -1L, messages)
   
 }
 
 // SENDS
 
-class PartitionDataSend(val partitionData: FetchResponsePartitionData) extends Send {
+class PartitionDataSend(val partitionId: Int,
+                        val partitionData: FetchResponsePartitionData) extends Send {
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(FetchResponsePartitionData.headerSize)
-  buffer.putInt(partitionData.partition)
+  private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
+  buffer.putInt(partitionId)
   buffer.putShort(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
   buffer.putLong(partitionData.hw)
@@ -87,8 +88,9 @@ object TopicData {
     val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val partitionCount = buffer.getInt
     val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
+      val partitionId = buffer.getInt
       val partitionData = FetchResponsePartitionData.readFrom(buffer)
-      (TopicAndPartition(topic, partitionData.partition), partitionData)
+      (partitionId, partitionData)
     })
     TopicData(topic, Map(topicPartitionDataPairs:_*))
   }
@@ -98,9 +100,9 @@ object TopicData {
     4 /* partition count */
 }
 
-case class TopicData(topic: String, partitionData: Map[TopicAndPartition, FetchResponsePartitionData])
{
+case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartitionData])
{
   val sizeInBytes =
-    TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes)
+    TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes + 4)
 
   val headerSize = TopicData.headerSize(topic)
 }
@@ -113,11 +115,12 @@ class TopicDataSend(val topicData: Topic
   override def complete = sent >= size
 
   private val buffer = ByteBuffer.allocate(topicData.headerSize)
-  Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset)
+  Utils.writeShortString(buffer, topicData.topic)
   buffer.putInt(topicData.partitionData.size)
   buffer.rewind()
 
-  val sends = new MultiSend(topicData.partitionData.toList.map(d => new PartitionDataSend(d._2)))
{
+  val sends = new MultiSend(topicData.partitionData.toList
+                                    .map(d => new PartitionDataSend(d._1, d._2))) {
     val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize
   }
 
@@ -148,9 +151,10 @@ object FetchResponse {
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
       val topicData = TopicData.readFrom(buffer)
-      topicData.partitionData.values.map(
-        partitionData => (TopicAndPartition(topicData.topic, partitionData.partition),
partitionData)
-      )
+      topicData.partitionData.map {
+        case (partitionId, partitionData) =>
+          (TopicAndPartition(topicData.topic, partitionId), partitionData)
+      }
     })
     FetchResponse(versionId, correlationId, Map(pairs:_*))
   }
@@ -169,9 +173,10 @@ case class FetchResponse(versionId: Shor
   val sizeInBytes =
     FetchResponse.headerSize +
     dataGroupedByTopic.foldLeft(0) ((folded, curr) => {
-      val topicData = TopicData(curr._1, curr._2)
-      folded +
-      topicData.sizeInBytes
+      val topicData = TopicData(curr._1, curr._2.map {
+        case (topicAndPartition, partitionData) => (topicAndPartition.partition, partitionData)
+      })
+      folded + topicData.sizeInBytes
     })
 
   private def partitionDataFor(topic: String, partition: Int): FetchResponsePartitionData
= {
@@ -212,7 +217,8 @@ class FetchResponseSend(val fetchRespons
   buffer.rewind()
 
   val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
-    case(topic, data) => new TopicDataSend(TopicData(topic, data))
+    case(topic, data) => new TopicDataSend(TopicData(topic,
+                                                     data.map{case(topicAndPartition, message)
=> (topicAndPartition.partition, message)}))
   }) {
     val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Sat Oct
13 00:21:45 2012
@@ -23,25 +23,6 @@ import kafka.utils._
 import scala.collection.Map
 import kafka.common.TopicAndPartition
 
-object ProducerRequestPartitionData {
-  def readFrom(buffer: ByteBuffer): ProducerRequestPartitionData = {
-    val partition = buffer.getInt
-    val messageSetSize = buffer.getInt
-    val messageSetBuffer = buffer.slice()
-    messageSetBuffer.limit(messageSetSize)
-    buffer.position(buffer.position + messageSetSize)
-    new ProducerRequestPartitionData(partition, new ByteBufferMessageSet(messageSetBuffer))
-  }
-
-  val headerSize =
-    4 + /* partition */
-    4 /* messageSetSize */
-}
-
-case class ProducerRequestPartitionData(partition: Int, messages: MessageSet) {
-
-  val sizeInBytes = ProducerRequestPartitionData.headerSize + messages.sizeInBytes.intValue()
-}
 
 object ProducerRequest {
   val CurrentVersion: Short = 0
@@ -63,8 +44,7 @@ object ProducerRequest {
         val messageSetSize = buffer.getInt
         val messageSetBuffer = new Array[Byte](messageSetSize)
         buffer.get(messageSetBuffer,0,messageSetSize)
-        (TopicAndPartition(topic, partition),
-         new ProducerRequestPartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))))
+        (TopicAndPartition(topic, partition), new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
       })
     })
 
@@ -77,7 +57,7 @@ case class ProducerRequest(versionId: Sh
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
-                           data: Map[TopicAndPartition, ProducerRequestPartitionData])
+                           data: Map[TopicAndPartition, MessageSet])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
@@ -89,7 +69,7 @@ case class ProducerRequest(versionId: Sh
            clientId: String,
            requiredAcks: Short,
            ackTimeoutMs: Int,
-           data: Map[TopicAndPartition, ProducerRequestPartitionData]) =
+           data: Map[TopicAndPartition, MessageSet]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs,
data)
 
   def writeTo(buffer: ByteBuffer) {
@@ -106,9 +86,10 @@ case class ProducerRequest(versionId: Sh
         Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the
topic
         buffer.putInt(topicAndPartitionData.size) //the number of partitions
         topicAndPartitionData.foreach(partitionAndData => {
-          val partitionData = partitionAndData._2
-          val bytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer
-          buffer.putInt(partitionData.partition)
+          val partition = partitionAndData._1.partition
+          val partitionMessageData = partitionAndData._2
+          val bytes = partitionMessageData.asInstanceOf[ByteBufferMessageSet].buffer
+          buffer.putInt(partition)
           buffer.putInt(bytes.limit)
           buffer.put(bytes)
           bytes.rewind
@@ -132,7 +113,7 @@ case class ProducerRequest(versionId: Sh
           foldedPartitions +
           4 + /* partition id */
           4 + /* byte-length of serialized messages */
-          currPartition._2.messages.sizeInBytes.toInt
+          currPartition._2.sizeInBytes.toInt
         })
       }
     })

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Sat Oct 13 00:21:45 2012
@@ -38,7 +38,7 @@ class ConsumerFetcherManager(private val
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
1) {
-  private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null
+  private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
   private val lock = new ReentrantLock
@@ -72,7 +72,7 @@ class ConsumerFetcherManager(private val
             // find the leader for this partition
             val leaderBrokerOpt = leaderForPartitionsMap.get((topic, partitionId))
             if(leaderBrokerOpt.isDefined){
-              val pti = partitionMap((topic, partitionId))
+              val pti = partitionMap(TopicAndPartition(topic, partitionId))
               addFetcher(topic, partitionId, pti.getFetchOffset(), leaderBrokerOpt.get)
               noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId))
             }
@@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val
       throw new RuntimeException("%s already shutdown".format(name))
     lock.lock()
     try {
-      partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap
+      partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId),
tpi)).toMap
       this.cluster = cluster
       noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
       cond.signalAll()
@@ -119,11 +119,11 @@ class ConsumerFetcherManager(private val
     lock.unlock()
   }
 
-  def getPartitionTopicInfo(key: (String, Int)) : PartitionTopicInfo = {
+  def getPartitionTopicInfo(topicAndPartition: TopicAndPartition) : PartitionTopicInfo =
{
     var pti :PartitionTopicInfo =null
     lock.lock()
     try {
-      pti = partitionMap(key)
+      pti = partitionMap(topicAndPartition)
     } finally {
       lock.unlock()
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
Sat Oct 13 00:21:45 2012
@@ -34,26 +34,25 @@ class ConsumerFetcherThread(name: String
           minBytes = config.minFetchBytes) {
 
   // process fetched data
-  def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData)
{
-    val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition))
+  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {
+    val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
     if (pti.getFetchOffset != fetchOffset)
       throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset:
%d fetch offset: %d"
-                                .format(topic, partitionData.partition, pti.getFetchOffset,
fetchOffset))
+                                .format(topicAndPartition.topic, topicAndPartition.partition,
pti.getFetchOffset, fetchOffset))
     pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
   }
 
   // handle a partition whose offset is out of range and return a new fetch offset
-  def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
+  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
     var startTimestamp : Long = 0
     config.autoOffsetReset match {
       case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime
       case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
       case _ => startTimestamp = OffsetRequest.LatestTime
     }
-    val topicAndPartition = TopicAndPartition(topic, partitionId)
     val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp,
1)))
     val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-    val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionId))
+    val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)
     newOffset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Sat
Oct 13 00:21:45 2012
@@ -24,13 +24,14 @@ import java.nio.ByteBuffer
 import kafka.api._
 import kafka.common.TopicAndPartition
 import kafka.utils.{Logging, SystemTime}
+import kafka.message.ByteBufferMessageSet
 
 
 object RequestChannel extends Logging {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
   def getShutdownReceive() = {
-    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition,
ProducerRequestPartitionData]())
+    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition,
ByteBufferMessageSet]())
     val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
     byteBuffer.putShort(RequestKeys.ProduceKey)
     emptyProducerRequest.writeTo(byteBuffer)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Sat Oct 13 00:21:45 2012
@@ -24,7 +24,7 @@ import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
 import scala.collection.{Seq, Map}
 import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.api.{TopicMetadata, ProducerRequest, ProducerRequestPartitionData}
+import kafka.api.{TopicMetadata, ProducerRequest}
 
 
 class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -207,7 +207,7 @@ class DefaultEventHandler[K,V](config: P
     } else if(messagesPerTopic.size > 0) {
       val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
         case (topicAndPartition, messages) =>
-          (topicAndPartition, new ProducerRequestPartitionData(topicAndPartition.partition,
messages))
+          (topicAndPartition, messages)
       }
       val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
         config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Sat Oct 13 00:21:45 2012
@@ -46,10 +46,11 @@ abstract class  AbstractFetcherThread(na
   /* callbacks to be defined in subclass */
 
   // process fetched data
-  def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData)
+  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long,
+                           partitionData: FetchResponsePartitionData)
 
   // handle a partition whose offset is out of range and return a new fetch offset
-  def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
+  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long
 
   // deal with partitions with errors, potentially due to leadership changes
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
@@ -112,9 +113,9 @@ abstract class  AbstractFetcherThread(na
                   FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw
- newOffset
                   fetcherMetrics.byteRate.mark(validBytes)
                   // Once we hand off the partition data to the subclass, we can't mess with
it any more in this thread
-                  processPartitionData(topic, currentOffset.get, partitionData)
+                  processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                 case ErrorMapping.OffsetOutOfRangeCode =>
-                  val newOffset = handleOffsetOutOfRange(topic, partitionId)
+                  val newOffset = handleOffsetOutOfRange(topicAndPartition)
                   fetchMap.put(topicAndPartition, newOffset)
                   warn("current offset %d for topic %s partition %d out of range; reset offset
to %d"
                     .format(currentOffset.get, topic, partitionId, newOffset))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Oct
13 00:21:45 2012
@@ -101,9 +101,8 @@ class KafkaApis(val requestChannel: Requ
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
    */
-  def maybeUnblockDelayedFetchRequests(topic: String, partitionData: ProducerRequestPartitionData)
{
-    val partition = partitionData.partition
-    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), partitionData)
+  def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messages: MessageSet)
{
+    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), messages)
     trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition,
satisfied.size))
 
     // send any newly unblocked responses
@@ -129,7 +128,7 @@ class KafkaApis(val requestChannel: Requ
 
     val numPartitionsInError = localProduceResults.count(_.error.isDefined)
     produceRequest.data.foreach(partitionAndData =>
-      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
+      maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition,
partitionAndData._2))
 
     val allPartitionHaveReplicationFactorOne =
       !produceRequest.data.keySet.exists(
@@ -182,34 +181,33 @@ class KafkaApis(val requestChannel: Requ
   /**
    * Helper method for handling a parsed producer request
    */
-  private def appendToLocalLog(messages: Map[TopicAndPartition, ProducerRequestPartitionData]):
Iterable[ProduceResult] = {
-    trace("Append [%s] to local log ".format(messages.toString))
-    messages.map (data => {
-      val (key, partitionData) = data
-      BrokerTopicStat.getBrokerTopicStat(key.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
-      BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
- 
+  private def appendToLocalLog(partitionAndData: Map[TopicAndPartition, MessageSet]): Iterable[ProduceResult]
= {
+    trace("Append [%s] to local log ".format(partitionAndData.toString))
+    partitionAndData.map {case (topicAndPartition, messages) =>
+      BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes)
+      BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(messages.sizeInBytes)
+
       try {
-        val localReplica = replicaManager.getLeaderReplicaIfLocal(key.topic, key.partition)
+        val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
topicAndPartition.partition)
         val log = localReplica.log.get
-        val (start, end) = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet],
assignOffsets = true)
+        val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets
= true)
         // we may need to increment high watermark since ISR could be down to 1
         localReplica.partition.maybeIncrementLeaderHW(localReplica)
         trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset
%d"
-              .format(partitionData.messages.sizeInBytes, key.topic, key.partition, start,
end))
-        ProduceResult(key, start, end)
+              .format(messages.size, topicAndPartition.topic, topicAndPartition.partition,
start, end))
+        ProduceResult(topicAndPartition, start, end)
       } catch {
         case e: KafkaStorageException =>
           fatal("Halting due to unrecoverable I/O error while handling produce request: ",
e)
           Runtime.getRuntime.halt(1)
           null
-        case e => 
-          BrokerTopicStat.getBrokerTopicStat(key.topic).failedProduceRequestRate.mark()
+        case e =>
+          BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
-          error("Error processing ProducerRequest on %s:%d".format(key.topic, key.partition),
e)
-          new ProduceResult(key, e)
+          error("Error processing ProducerRequest on %s:%d".format(topicAndPartition.topic,
topicAndPartition.partition), e)
+          new ProduceResult(topicAndPartition, e)
        }
-    })
+    }
   }
 
   /**
@@ -266,28 +264,29 @@ class KafkaApis(val requestChannel: Requ
    */
   private def readMessageSets(fetchRequest: FetchRequest) = {
     val isFetchFromFollower = fetchRequest.isFromFollower
-    fetchRequest.requestInfo.map {
+    fetchRequest.requestInfo.map
+    {
       case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
-        val partitionData = try {
-          val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize,
fetchRequest.replicaId)
-          BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
-          BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
-          if (!isFetchFromFollower) {
-            new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark,
messages)
-          } else {
-            debug("Leader %d for topic %s partition %d received fetch request from follower
%d"
-                          .format(brokerId, topic, partition, fetchRequest.replicaId))
-
-            new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark,
messages)
+        val partitionData =
+          try {
+            val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize,
fetchRequest.replicaId)
+            BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+            if (!isFetchFromFollower) {
+              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark,
messages)
+            } else {
+              debug("Leader %d for topic %s partition %d received fetch request from follower
%d"
+                            .format(brokerId, topic, partition, fetchRequest.replicaId))
+              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark,
messages)
+            }
+          } catch {
+            case t: Throwable =>
+              BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
+              BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
+              error("error when processing request " + (topic, partition, offset, fetchSize),
t)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
+                                             offset, -1L, MessageSet.Empty)
           }
-        } catch {
-          case t: Throwable =>
-            BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
-            BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
-            error("error when processing request " + (topic, partition, offset, fetchSize),
t)
-            new FetchResponsePartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-                              offset, -1L, MessageSet.Empty)
-        }
         (TopicAndPartition(topic, partition), partitionData)
     }
   }
@@ -295,8 +294,11 @@ class KafkaApis(val requestChannel: Requ
   /**
    * Read from a single topic/partition at the given offset upto maxSize bytes
    */
-  private def readMessageSet(topic: String, partition: Int, offset: Long,
-                             maxSize: Int, fromReplicaId: Int): (MessageSet, Long) = {
+  private def readMessageSet(topic: String, 
+                             partition: Int, 
+                             offset: Long,
+                             maxSize: Int, 
+                             fromReplicaId: Int): (MessageSet, Long) = {
     // check if the current broker is the leader for the partitions
     val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
       replicaManager.getReplicaOrException(topic, partition)
@@ -438,14 +440,14 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch,
ProducerRequestPartitionData](brokerId) {
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch,
MessageSet](brokerId) {
     this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes
field
      */
-    def checkSatisfied(partitionData: ProducerRequestPartitionData, delayedFetch: DelayedFetch):
Boolean = {
-      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(partitionData.messages.sizeInBytes)
+    def checkSatisfied(messages: MessageSet, delayedFetch: DelayedFetch): Boolean = {
+      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messages.sizeInBytes)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
 
@@ -537,8 +539,8 @@ class KafkaApis(val requestChannel: Requ
           fetchPartitionStatus.error = ErrorMapping.NoError
         }
         if (!fetchPartitionStatus.acksPending) {
-          val partitionData = produce.data(followerFetchRequestKey.topicAndPartition)
-          maybeUnblockDelayedFetchRequests(topic, partitionData)
+          val messages = produce.data(followerFetchRequestKey.topicAndPartition)
+          maybeUnblockDelayedFetchRequests(topic, partitionId, messages)
         }
       }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Sat Oct 13 00:21:45 2012
@@ -30,8 +30,9 @@ class ReplicaFetcherThread(name:String, 
     minBytes = brokerConfig.replicaMinBytes) {
 
   // process fetched data
-  def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData)
{
-    val partitionId = partitionData.partition
+  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {
+    val topic = topicAndPartition.topic
+    val partitionId = topicAndPartition.partition
     val replica = replicaMgr.getReplica(topic, partitionId).get
     val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
 
@@ -49,15 +50,14 @@ class ReplicaFetcherThread(name:String, 
   }
 
   // handle a partition whose offset is out of range and return a new fetch offset
-  def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
+  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
     // This means the local replica is out of date. Truncate the log and catch up from beginning.
-    val topicAndPartition = TopicAndPartition(topic, partitionId)
     val request = OffsetRequest(
       replicaId = brokerConfig.brokerId,
       requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,
1))
     )
     val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-    val replica = replicaMgr.getReplica(topic, partitionId).get
+    val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
     replica.log.get.truncateAndStartWithNewOffset(offset)
     offset
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
Sat Oct 13 00:21:45 2012
@@ -34,29 +34,32 @@ object SerializationTestUtils{
   private val isr1 = List(0, 1, 2)
   private val leader2 = 0
   private val isr2 = List(0, 2, 3)
-  private val partitionDataFetchResponse0 = new FetchResponsePartitionData(0, new ByteBufferMessageSet(new
Message("first message".getBytes)))
-  private val partitionDataFetchResponse1 = new FetchResponsePartitionData(1, new ByteBufferMessageSet(new
Message("second message".getBytes)))
-  private val partitionDataFetchResponse2 = new FetchResponsePartitionData(2, new ByteBufferMessageSet(new
Message("third message".getBytes)))
-  private val partitionDataFetchResponse3 = new FetchResponsePartitionData(3, new ByteBufferMessageSet(new
Message("fourth message".getBytes)))
-  private val partitionDataFetchResponseArray = Array(partitionDataFetchResponse0, partitionDataFetchResponse1,
partitionDataFetchResponse2, partitionDataFetchResponse3)
+  private val partitionDataFetchResponse0 = new FetchResponsePartitionData(new ByteBufferMessageSet(new
Message("first message".getBytes)))
+  private val partitionDataFetchResponse1 = new FetchResponsePartitionData(new ByteBufferMessageSet(new
Message("second message".getBytes)))
+  private val partitionDataFetchResponse2 = new FetchResponsePartitionData(new ByteBufferMessageSet(new
Message("third message".getBytes)))
+  private val partitionDataFetchResponse3 = new FetchResponsePartitionData(new ByteBufferMessageSet(new
Message("fourth message".getBytes)))
+  private val partitionDataFetchResponseMap = Map((0, partitionDataFetchResponse0), (1, partitionDataFetchResponse1),
(2, partitionDataFetchResponse2), (3, partitionDataFetchResponse3))
 
   private val topicDataFetchResponse = {
     val groupedData = Array(topic1, topic2).flatMap(topic =>
-      partitionDataFetchResponseArray.map(partitionData =>
-        (TopicAndPartition(topic, partitionData.partition), partitionData)))
+      partitionDataFetchResponseMap.map(partitionAndData =>
+        (TopicAndPartition(topic, partitionAndData._1), partitionAndData._2)))
     collection.immutable.Map(groupedData:_*)
   }
 
-  private val partitionDataProducerRequest0 = new ProducerRequestPartitionData(0, new ByteBufferMessageSet(new
Message("first message".getBytes)))
-  private val partitionDataProducerRequest1 = new ProducerRequestPartitionData(1, new ByteBufferMessageSet(new
Message("second message".getBytes)))
-  private val partitionDataProducerRequest2 = new ProducerRequestPartitionData(2, new ByteBufferMessageSet(new
Message("third message".getBytes)))
-  private val partitionDataProducerRequest3 = new ProducerRequestPartitionData(3, new ByteBufferMessageSet(new
Message("fourth message".getBytes)))
-  private val partitionDataProducerRequestArray = Array(partitionDataProducerRequest0, partitionDataProducerRequest1,
partitionDataProducerRequest2, partitionDataProducerRequest3)
+  private val partitionDataMessage0 = new ByteBufferMessageSet(new Message("first message".getBytes))
+  private val partitionDataMessage1 = new ByteBufferMessageSet(new Message("second message".getBytes))
+  private val partitionDataMessage2 = new ByteBufferMessageSet(new Message("third message".getBytes))
+  private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes))
+  private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1,
partitionDataMessage2, partitionDataMessage3)
 
   private val topicDataProducerRequest = {
     val groupedData = Array(topic1, topic2).flatMap(topic =>
-      partitionDataProducerRequestArray.map(partitionData =>
-        (TopicAndPartition(topic, partitionData.partition), partitionData)))
+      partitionDataProducerRequestArray.zipWithIndex.map
+      {
+        case(partitionDataMessage, partition) =>
+          (TopicAndPartition(topic, partition), partitionDataMessage)
+      })
     collection.immutable.Map(groupedData:_*)
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Sat Oct 13 00:21:45 2012
@@ -25,9 +25,10 @@ import kafka.utils.TestUtils
 import java.util.Random
 import junit.framework.Assert._
 import kafka.producer.SyncProducerConfig
-import kafka.api.{ProducerRequestPartitionData, ProducerRequest}
+import kafka.api.ProducerRequest
 import java.nio.ByteBuffer
 import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
 
 
 class SocketServerTest extends JUnitSuite {
@@ -78,7 +79,7 @@ class SocketServerTest extends JUnitSuit
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
     val emptyRequest =
-      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition,
ProducerRequestPartitionData]())
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition,
ByteBufferMessageSet]())
 
     val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
     emptyRequest.writeTo(byteBuffer)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Sat Oct 13 00:21:45 2012
@@ -21,14 +21,13 @@ import java.net.SocketTimeoutException
 import java.util.Properties
 import junit.framework.Assert
 import kafka.admin.CreateTopicCommand
-import kafka.common.{ErrorMapping}
 import kafka.integration.KafkaServerTestHarness
 import kafka.message._
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.{ProducerResponseStatus, ProducerRequestPartitionData}
+import kafka.api.ProducerResponseStatus
 import kafka.common.{TopicAndPartition, ErrorMapping}
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -86,7 +85,7 @@ class SyncProducerTest extends JUnit3Sui
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs,
Map[TopicAndPartition, ProducerRequestPartitionData]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs,
Map[TopicAndPartition, MessageSet]())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1397744&r1=1397743&r2=1397744&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sat
Oct 13 00:21:45 2012
@@ -367,7 +367,7 @@ object TestUtils extends Logging {
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val data = topics.flatMap(topic =>
-      partitions.map(partition => (TopicAndPartition(topic,  partition), new ProducerRequestPartitionData(partition,
message)))
+      partitions.map(partition => (TopicAndPartition(topic,  partition), message))
     )
     new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }



Mime
View raw message