kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject svn commit: r1394587 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/consumer/ main/scala/kafka/network/ main/scala/kafka/producer/async/ main/scala/kafka/server/ test/scala/unit/kafka/network/ test/scala/unit/kafka/...
Date Fri, 05 Oct 2012 15:27:05 GMT
Author: joestein
Date: Fri Oct  5 15:27:05 2012
New Revision: 1394587

URL: http://svn.apache.org/viewvc?rev=1394587&view=rev
Log:
KAFKA-508 split out partiondata from fetchresponse and producerrequest

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Fri Oct
 5 15:27:05 2012
@@ -24,8 +24,8 @@ import kafka.message.{MessageSet, ByteBu
 import kafka.network.{MultiSend, Send}
 import kafka.utils.Utils
 
-object PartitionData {
-  def readFrom(buffer: ByteBuffer): PartitionData = {
+object FetchResponsePartitionData {
+  def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
     val partition = buffer.getInt
     val error = buffer.getShort
     val initialOffset = buffer.getLong
@@ -34,7 +34,7 @@ object PartitionData {
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer,
initialOffset))
+    new FetchResponsePartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer,
initialOffset))
   }
 
   val headerSize =
@@ -45,20 +45,20 @@ object PartitionData {
     4 /* messageSetSize */
 }
 
-case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long
= 0L, hw: Long = -1L, messages: MessageSet) {
+case class FetchResponsePartitionData(partition: Int, error: Short = ErrorMapping.NoError,
initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
 
-  val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue()
+  val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes.intValue()
 
   def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError,
0L, -1L, messages)
 }
 
 // SENDS
 
-class PartitionDataSend(val partitionData: PartitionData) extends Send {
+class PartitionDataSend(val partitionData: FetchResponsePartitionData) extends Send {
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(PartitionData.headerSize)
+  private val buffer = ByteBuffer.allocate(FetchResponsePartitionData.headerSize)
   buffer.putInt(partitionData.partition)
   buffer.putShort(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
@@ -86,7 +86,7 @@ object TopicData {
     val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val partitionCount = buffer.getInt
     val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
-      val partitionData = PartitionData.readFrom(buffer)
+      val partitionData = FetchResponsePartitionData.readFrom(buffer)
       (TopicAndPartition(topic, partitionData.partition), partitionData)
     })
     TopicData(topic, Map(topicPartitionDataPairs:_*))
@@ -97,7 +97,7 @@ object TopicData {
     4 /* partition count */
 }
 
-case class TopicData(topic: String, partitionData: Map[TopicAndPartition, PartitionData])
{
+case class TopicData(topic: String, partitionData: Map[TopicAndPartition, FetchResponsePartitionData])
{
   val sizeInBytes =
     TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes)
 
@@ -158,7 +158,7 @@ object FetchResponse {
 
 case class FetchResponse(versionId: Short,
                          correlationId: Int,
-                         data: Map[TopicAndPartition, PartitionData])  {
+                         data: Map[TopicAndPartition, FetchResponsePartitionData])  {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
@@ -173,7 +173,7 @@ case class FetchResponse(versionId: Shor
       topicData.sizeInBytes
     })
 
-  private def partitionDataFor(topic: String, partition: Int): PartitionData = {
+  private def partitionDataFor(topic: String, partition: Int): FetchResponsePartitionData
= {
     val topicAndPartition = TopicAndPartition(topic, partition)
     data.get(topicAndPartition) match {
       case Some(partitionData) => partitionData

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Fri Oct
 5 15:27:05 2012
@@ -23,6 +23,25 @@ import kafka.utils._
 import scala.collection.Map
 import kafka.common.TopicAndPartition
 
+object ProducerRequestPartitionData {
+  def readFrom(buffer: ByteBuffer): ProducerRequestPartitionData = {
+    val partition = buffer.getInt
+    val messageSetSize = buffer.getInt
+    val messageSetBuffer = buffer.slice()
+    messageSetBuffer.limit(messageSetSize)
+    buffer.position(buffer.position + messageSetSize)
+    new ProducerRequestPartitionData(partition, new ByteBufferMessageSet(messageSetBuffer))
+  }
+
+  val headerSize =
+    4 + /* partition */
+    4 /* messageSetSize */
+}
+
+case class ProducerRequestPartitionData(partition: Int, messages: MessageSet) {
+
+  val sizeInBytes = ProducerRequestPartitionData.headerSize + messages.sizeInBytes.intValue()
+}
 
 object ProducerRequest {
   val CurrentVersion: Short = 0
@@ -45,7 +64,7 @@ object ProducerRequest {
         val messageSetBuffer = new Array[Byte](messageSetSize)
         buffer.get(messageSetBuffer,0,messageSetSize)
         (TopicAndPartition(topic, partition),
-         new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))))
+         new ProducerRequestPartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))))
       })
     })
 
@@ -58,7 +77,7 @@ case class ProducerRequest( versionId: S
                             clientId: String,
                             requiredAcks: Short,
                             ackTimeoutMs: Int,
-                            data: Map[TopicAndPartition, PartitionData])
+                            data: Map[TopicAndPartition, ProducerRequestPartitionData])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
@@ -70,7 +89,7 @@ case class ProducerRequest( versionId: S
            clientId: String,
            requiredAcks: Short,
            ackTimeoutMs: Int,
-           data: Map[TopicAndPartition, PartitionData]) =
+           data: Map[TopicAndPartition, ProducerRequestPartitionData]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs,
data)
 
   def writeTo(buffer: ByteBuffer) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
Fri Oct  5 15:27:05 2012
@@ -20,7 +20,7 @@ package kafka.consumer
 import kafka.cluster.Broker
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, PartitionData}
+import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
 import kafka.common.TopicAndPartition
 
 
@@ -34,7 +34,7 @@ class ConsumerFetcherThread(name: String
           minBytes = config.minFetchBytes) {
 
   // process fetched data
-  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)
{
+  def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData)
{
     val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition))
     if (pti.getFetchOffset != fetchOffset)
       throw new RuntimeException("offset doesn't match for topic %s partition: %d pti offset:
%d fetch ofset: %d"

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Fri
Oct  5 15:27:05 2012
@@ -30,7 +30,7 @@ object RequestChannel extends Logging {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
   def getShutdownReceive() = {
-    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition,
PartitionData]())
+    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition,
ProducerRequestPartitionData]())
     val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
     byteBuffer.putShort(RequestKeys.ProduceKey)
     emptyProducerRequest.writeTo(byteBuffer)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Fri Oct  5 15:27:05 2012
@@ -24,7 +24,7 @@ import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
 import scala.collection.{Seq, Map}
 import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.api.{TopicMetadata, ProducerRequest, PartitionData}
+import kafka.api.{TopicMetadata, ProducerRequest, ProducerRequestPartitionData}
 
 
 class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -207,7 +207,7 @@ class DefaultEventHandler[K,V](config: P
     } else if(messagesPerTopic.size > 0) {
       val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
         case (topicAndPartition, messages) =>
-          (topicAndPartition, new PartitionData(topicAndPartition.partition, messages))
+          (topicAndPartition, new ProducerRequestPartitionData(topicAndPartition.partition,
messages))
       }
       val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
         config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Fri Oct  5 15:27:05 2012
@@ -22,7 +22,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
+import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
@@ -44,7 +44,7 @@ abstract class  AbstractFetcherThread(na
   // callbacks to be defined in subclass
 
   // process fetched data
-  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)
+  def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData)
 
   // handle a partition whose offset is out of range and return a new fetch offset
   def handleOffsetOutOfRange(topic: String, partitionId: Int): Long

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Oct
 5 15:27:05 2012
@@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: Requ
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
    */
-  def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) {
+  def maybeUnblockDelayedFetchRequests(topic: String, partitionData: ProducerRequestPartitionData)
{
     val partition = partitionData.partition
     val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), null)
     trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition,
satisfied.size))
@@ -284,7 +284,7 @@ class KafkaApis(val requestChannel: Requ
 
   /**
    * Read from all the offset details given and return a map of
-   * (topic, partition) -> PartitionData
+   * (topic, partition) -> FetchResponsePartitionData
    */
   private def readMessageSets(fetchRequest: FetchRequest) = {
     val isFetchFromFollower = fetchRequest.isFromFollower
@@ -295,13 +295,13 @@ class KafkaApis(val requestChannel: Requ
           BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
           BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
           if (!isFetchFromFollower) {
-            new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
+            new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark,
messages)
           } else {
             debug("Leader %d for topic %s partition %d received fetch request from follower
%d"
                           .format(brokerId, topic, partition, fetchRequest.replicaId))
             debug("Leader %d returning %d messages for topic %s partition %d to follower
%d"
                           .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
-            new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
+            new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark,
messages)
           }
         }
         catch {
@@ -309,7 +309,7 @@ class KafkaApis(val requestChannel: Requ
             BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
             BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
             error("error when processing request " + (topic, partition, offset, fetchSize),
t)
-            new PartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
+            new FetchResponsePartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
                               offset, -1L, MessageSet.Empty)
         }
         (TopicAndPartition(topic, partition), partitionData)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Fri Oct  5 15:27:05 2012
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, PartitionData}
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
 import kafka.common.TopicAndPartition
@@ -30,7 +30,7 @@ class ReplicaFetcherThread(name:String, 
     minBytes = brokerConfig.replicaMinBytes) {
 
   // process fetched data
-  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)
{
+  def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData)
{
     val partitionId = partitionData.partition
     val replica = replicaMgr.getReplica(topic, partitionId).get
     val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
Fri Oct  5 15:27:05 2012
@@ -35,15 +35,28 @@ object RpcDataSerializationTestUtils{
   private val isr1 = List(0, 1, 2)
   private val leader2 = 0
   private val isr2 = List(0, 2, 3)
-  private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first
message".getBytes)))
-  private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second
message".getBytes)))
-  private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third
message".getBytes)))
-  private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth
message".getBytes)))
-  private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2,
partitionData3)
+  private val partitionDataFetchResponse0 = new FetchResponsePartitionData(0, new ByteBufferMessageSet(new
Message("first message".getBytes)))
+  private val partitionDataFetchResponse1 = new FetchResponsePartitionData(1, new ByteBufferMessageSet(new
Message("second message".getBytes)))
+  private val partitionDataFetchResponse2 = new FetchResponsePartitionData(2, new ByteBufferMessageSet(new
Message("third message".getBytes)))
+  private val partitionDataFetchResponse3 = new FetchResponsePartitionData(3, new ByteBufferMessageSet(new
Message("fourth message".getBytes)))
+  private val partitionDataFetchResponseArray = Array(partitionDataFetchResponse0, partitionDataFetchResponse1,
partitionDataFetchResponse2, partitionDataFetchResponse3)
 
-  private val topicData = {
+  private val topicDataFetchResponse = {
     val groupedData = Array(topic1, topic2).flatMap(topic =>
-      partitionDataArray.map(partitionData =>
+      partitionDataFetchResponseArray.map(partitionData =>
+        (TopicAndPartition(topic, partitionData.partition), partitionData)))
+    collection.immutable.Map(groupedData:_*)
+  }
+
+  private val partitionDataProducerRequest0 = new ProducerRequestPartitionData(0, new ByteBufferMessageSet(new
Message("first message".getBytes)))
+  private val partitionDataProducerRequest1 = new ProducerRequestPartitionData(1, new ByteBufferMessageSet(new
Message("second message".getBytes)))
+  private val partitionDataProducerRequest2 = new ProducerRequestPartitionData(2, new ByteBufferMessageSet(new
Message("third message".getBytes)))
+  private val partitionDataProducerRequest3 = new ProducerRequestPartitionData(3, new ByteBufferMessageSet(new
Message("fourth message".getBytes)))
+  private val partitionDataProducerRequestArray = Array(partitionDataProducerRequest0, partitionDataProducerRequest1,
partitionDataProducerRequest2, partitionDataProducerRequest3)
+
+  private val topicDataProducerRequest = {
+    val groupedData = Array(topic1, topic2).flatMap(topic =>
+      partitionDataProducerRequestArray.map(partitionData =>
         (TopicAndPartition(topic, partitionData.partition), partitionData)))
     collection.immutable.Map(groupedData:_*)
   }
@@ -92,7 +105,7 @@ object RpcDataSerializationTestUtils{
   }
 
   def createTestProducerRequest: ProducerRequest = {
-    new ProducerRequest(1, "client 1", 0, 1000, topicData)
+    new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
   }
 
   def createTestProducerResponse: ProducerResponse =
@@ -106,7 +119,7 @@ object RpcDataSerializationTestUtils{
   }
 
   def createTestFetchResponse: FetchResponse = {
-    FetchResponse(1, 1, topicData)
+    FetchResponse(1, 1, topicDataFetchResponse)
   }
 
   def createTestOffsetRequest = new OffsetRequest(

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Fri Oct  5 15:27:05 2012
@@ -25,7 +25,7 @@ import kafka.utils.TestUtils
 import java.util.Random
 import junit.framework.Assert._
 import kafka.producer.SyncProducerConfig
-import kafka.api.{PartitionData, ProducerRequest}
+import kafka.api.{ProducerRequestPartitionData, ProducerRequest}
 import java.nio.ByteBuffer
 import kafka.common.TopicAndPartition
 
@@ -78,7 +78,7 @@ class SocketServerTest extends JUnitSuit
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
     val emptyRequest =
-      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition,
PartitionData]())
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition,
ProducerRequestPartitionData]())
 
     val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
     emptyRequest.writeTo(byteBuffer)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Fri Oct  5 15:27:05 2012
@@ -27,7 +27,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.{ProducerResponseStatus, PartitionData}
+import kafka.api.{ProducerResponseStatus, ProducerRequestPartitionData}
 import kafka.common.{TopicAndPartition, ErrorMapping}
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -85,7 +85,7 @@ class SyncProducerTest extends JUnit3Sui
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs,
Map[TopicAndPartition, PartitionData]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs,
Map[TopicAndPartition, ProducerRequestPartitionData]())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1394587&r1=1394586&r2=1394587&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri
Oct  5 15:27:05 2012
@@ -367,7 +367,7 @@ object TestUtils extends Logging {
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val data = topics.flatMap(topic =>
-      partitions.map(partition => (TopicAndPartition(topic,  partition), new PartitionData(partition,
message)))
+      partitions.map(partition => (TopicAndPartition(topic,  partition), new ProducerRequestPartitionData(partition,
message)))
     )
     new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }



Mime
View raw message