kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1205313 - in /incubator/kafka/trunk/core/src/main/scala/kafka: api/ javaapi/message/ message/ network/ server/
Date Wed, 23 Nov 2011 07:29:38 GMT
Author: jkreps
Date: Wed Nov 23 07:29:37 2011
New Revision: 1205313

URL: http://svn.apache.org/viewvc?rev=1205313&view=rev
Log:
KAFKA-171 Do a single write for request sends.


Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/OffsetRequest.scala Wed Nov 23 07:29:37
2011
@@ -20,7 +20,7 @@ package kafka.api
 import java.nio.ByteBuffer
 import kafka.utils.{nonthreadsafe, Utils}
 import kafka.network.{Send, Request}
-import java.nio.channels.WritableByteChannel
+import java.nio.channels.GatheringByteChannel
 import kafka.common.ErrorMapping
 
 object OffsetRequest {
@@ -85,7 +85,7 @@ private[kafka] class OffsetArraySend(off
 
   var complete: Boolean = false
 
-  def writeTo(channel: WritableByteChannel): Int = {
+  def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()
     var written = 0
     if(header.hasRemaining)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Wed Nov
23 07:29:37 2011
@@ -17,7 +17,7 @@
 
 package kafka.javaapi.message
 
-import java.nio.channels.WritableByteChannel
+import java.nio.channels._
 import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
 
 /**

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed
Nov 23 07:29:37 2011
@@ -21,7 +21,7 @@ import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.common.{InvalidMessageSizeException, ErrorMapping}
 import java.nio.ByteBuffer
-import java.nio.channels.WritableByteChannel
+import java.nio.channels._
 import kafka.utils.IteratorTemplate
 
 /**
@@ -71,7 +71,7 @@ class ByteBufferMessageSet(private val b
   }
   
   /** Write the messages in this set to the given channel */
-  def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
+  def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long =
     channel.write(buffer.duplicate)
   
   override def iterator: Iterator[MessageAndOffset] = deepIterator

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala Wed Nov 23
07:29:37 2011
@@ -100,7 +100,7 @@ class FileMessageSet private[kafka](priv
   /**
    * Write some of this set to the given channel, return the ammount written
    */
-  def writeTo(destChannel: WritableByteChannel, writeOffset: Long, size: Long): Long = 
+  def writeTo(destChannel: GatheringByteChannel, writeOffset: Long, size: Long): Long = 
     channel.transferTo(offset + writeOffset, scala.math.min(size, sizeInBytes), destChannel)
   
   /**

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/MessageSet.scala Wed Nov 23 07:29:37
2011
@@ -89,7 +89,7 @@ abstract class MessageSet extends Iterab
   /** Write the messages in this set to the given channel starting at the given offset byte.

     * Less than the complete amount may be written, but no more than maxSize can be. The
number
     * of bytes written is returned */
-  def writeTo(channel: WritableByteChannel, offset: Long, maxSize: Long): Long
+  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Long): Long
   
   /**
    * Provides an iterator over the messages in this set

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala Wed
Nov 23 07:29:37 2011
@@ -25,10 +25,14 @@ import kafka.utils._
 private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send {
   
   private var sizeBuffer = ByteBuffer.allocate(4)
-  
+
+  // Avoid possibility of overflow for 2GB-4 byte buffer
+  if(buffer.remaining > Int.MaxValue - sizeBuffer.limit)
+    throw new IllegalArgumentException("Attempt to create a bounded buffer of " + buffer.remaining
+ " bytes, but the maximum " +
+                                       "allowable size for a bounded buffer is " + (Int.MaxValue
- sizeBuffer.limit) + ".")    
   sizeBuffer.putInt(buffer.limit)
   sizeBuffer.rewind()
-  
+
   var complete: Boolean = false
 
   def this(size: Int) = this(ByteBuffer.allocate(size))
@@ -40,20 +44,13 @@ private[kafka] class BoundedByteBufferSe
     buffer.rewind()
   }
   
-  def writeTo(channel: WritableByteChannel): Int = {
+  def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()
-    var written = 0
-    // try to write the size if we haven't already
-    if(sizeBuffer.hasRemaining)
-      written += channel.write(sizeBuffer)
-    // try to write the actual buffer itself
-    if(!sizeBuffer.hasRemaining && buffer.hasRemaining)
-      written += channel.write(buffer)
+    var written = channel.write(Array(sizeBuffer, buffer))
     // if we are done, mark it off
     if(!buffer.hasRemaining)
-      complete = true
-    
-    written
+      complete = true    
+    written.asInstanceOf[Int]
   }
     
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/ByteBufferSend.scala Wed Nov 23
07:29:37 2011
@@ -28,7 +28,7 @@ private[kafka] class ByteBufferSend(val 
 
   def this(size: Int) = this(ByteBuffer.allocate(size))
   
-  def writeTo(channel: WritableByteChannel): Int = {
+  def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()
     var written = 0
     written += channel.write(buffer)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala Wed Nov 23
07:29:37 2011
@@ -65,9 +65,9 @@ private[kafka] trait Receive extends Tra
  */
 private[kafka] trait Send extends Transmission {
     
-  def writeTo(channel: WritableByteChannel): Int
+  def writeTo(channel: GatheringByteChannel): Int
   
-  def writeCompletely(channel: WritableByteChannel): Int = {
+  def writeCompletely(channel: GatheringByteChannel): Int = {
     var written = 0
     while(!complete) {
       written = writeTo(channel)
@@ -86,7 +86,7 @@ abstract class MultiSend[S <: Send](val 
   private var current = sends
   var totalWritten = 0
 
-  def writeTo(channel: WritableByteChannel): Int = {
+  def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete
     val written = current.head.writeTo(channel)
     totalWritten += written

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala?rev=1205313&r1=1205312&r2=1205313&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala Wed Nov 23
07:29:37 2011
@@ -44,7 +44,7 @@ private[server] class MessageSetSend(val
 
   def this() = this(MessageSet.Empty)
 
-  def writeTo(channel: WritableByteChannel): Int = {
+  def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()
     var written = 0
     if(header.hasRemaining)



Mime
View raw message