kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1414917 - in /kafka/branches/0.8/core/src: main/scala/kafka/api/ProducerRequest.scala main/scala/kafka/producer/async/DefaultEventHandler.scala test/scala/unit/kafka/producer/SyncProducerTest.scala
Date Wed, 28 Nov 2012 21:05:18 GMT
Author: junrao
Date: Wed Nov 28 21:05:17 2012
New Revision: 1414917

URL: http://svn.apache.org/viewvc?rev=1414917&view=rev
Log:
ProducerRequest should take ByteBufferMessageSet instead of MessageSet; patched by Jun Rao;
reviewed by Neha Narkhede; KAFKA-632

Modified:
    kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala

Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1414917&r1=1414916&r2=1414917&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Wed Nov 28 21:05:17
2012
@@ -57,7 +57,7 @@ case class ProducerRequest(versionId: Sh
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
-                           data: Map[TopicAndPartition, MessageSet])
+                           data: Map[TopicAndPartition, ByteBufferMessageSet])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
@@ -69,7 +69,7 @@ case class ProducerRequest(versionId: Sh
            clientId: String,
            requiredAcks: Short,
            ackTimeoutMs: Int,
-           data: Map[TopicAndPartition, MessageSet]) =
+           data: Map[TopicAndPartition, ByteBufferMessageSet]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs,
data)
 
   def writeTo(buffer: ByteBuffer) {
@@ -88,7 +88,7 @@ case class ProducerRequest(versionId: Sh
         topicAndPartitionData.foreach(partitionAndData => {
           val partition = partitionAndData._1.partition
           val partitionMessageData = partitionAndData._2
-          val bytes = partitionMessageData.asInstanceOf[ByteBufferMessageSet].buffer
+          val bytes = partitionMessageData.buffer
           buffer.putInt(partition)
           buffer.putInt(bytes.limit)
           buffer.put(bytes)

Modified: kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1414917&r1=1414916&r2=1414917&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Wed Nov 28 21:05:17 2012
@@ -210,12 +210,8 @@ class DefaultEventHandler[K,V](config: P
       warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
       messagesPerTopic.keys.toSeq
     } else if(messagesPerTopic.size > 0) {
-      val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
-        case (topicAndPartition, messages) =>
-          (topicAndPartition, messages)
-      }
       val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
-        config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))
+        config.requestTimeoutMs, messagesPerTopic)
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)

Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1414917&r1=1414916&r2=1414917&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Wed
Nov 28 21:05:17 2012
@@ -85,7 +85,7 @@ class SyncProducerTest extends JUnit3Sui
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs,
Map[TopicAndPartition, MessageSet]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs,
Map[TopicAndPartition, ByteBufferMessageSet]())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)



Mime
View raw message