kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1401760 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/javaapi/message/ main/scala/kafka/log/ main/scala/kafka/message/ main/scala/kafka/server/ test/scala/unit/kafka/javaapi/message/ test/scala/unit/ka...
Date Wed, 24 Oct 2012 16:19:27 GMT
Author: junrao
Date: Wed Oct 24 16:19:26 2012
New Revision: 1401760

URL: http://svn.apache.org/viewvc?rev=1401760&view=rev
Log:
Change MessageSet.sizeInBytes to Int; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-556

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Wed Oct
24 16:19:26 2012
@@ -47,7 +47,7 @@ object FetchResponsePartitionData {
 case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
                                       initialOffset:Long = 0L, hw: Long = -1L, messages:
MessageSet) {
 
-  val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes.intValue()
+  val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
 
   def this(messages: MessageSet) = this(ErrorMapping.NoError, 0L, -1L, messages)
   
@@ -58,14 +58,14 @@ case class FetchResponsePartitionData(er
 class PartitionDataSend(val partitionId: Int,
                         val partitionData: FetchResponsePartitionData) extends Send {
   private val messageSize = partitionData.messages.sizeInBytes
-  private var messagesSentSize = 0L
+  private var messagesSentSize = 0
 
   private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
   buffer.putInt(partitionId)
   buffer.putShort(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
   buffer.putLong(partitionData.hw)
-  buffer.putInt(partitionData.messages.sizeInBytes.intValue())
+  buffer.putInt(partitionData.messages.sizeInBytes)
   buffer.rewind()
 
   override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
@@ -75,7 +75,7 @@ class PartitionDataSend(val partitionId:
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && messagesSentSize < messageSize) {
-      val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize
- messagesSentSize).toInt
+      val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize
- messagesSentSize)
       messagesSentSize += bytesSent
       written += bytesSent
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Wed Oct
24 16:19:26 2012
@@ -113,7 +113,7 @@ case class ProducerRequest(versionId: Sh
           foldedPartitions +
           4 + /* partition id */
           4 + /* byte-length of serialized messages */
-          currPartition._2.sizeInBytes.toInt
+          currPartition._2.sizeInBytes
         })
       }
     })

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
Wed Oct 24 16:19:26 2012
@@ -32,7 +32,7 @@ class ByteBufferMessageSet(@BeanProperty
     this(NoCompressionCodec, messages)
   }
 
-  def validBytes: Long = underlying.validBytes
+  def validBytes: Int = underlying.validBytes
 
   override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset]
{
     val underlyingIterator = underlying.iterator
@@ -49,7 +49,7 @@ class ByteBufferMessageSet(@BeanProperty
 
   override def toString: String = underlying.toString
 
-  def sizeInBytes: Long = underlying.sizeInBytes
+  def sizeInBytes: Int = underlying.sizeInBytes
 
   override def equals(other: Any): Boolean = {
     other match {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
Wed Oct 24 16:19:26 2012
@@ -38,7 +38,7 @@ abstract class MessageSet extends java.l
   /**
    * Gives the total size of this message set in bytes
    */
-  def sizeInBytes: Long
+  def sizeInBytes: Int
 
   /**
    * Validate the checksum of all the messages in the set. Throws an InvalidMessageException
if the checksum doesn't

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala Wed Oct
24 16:19:26 2012
@@ -36,12 +36,12 @@ import kafka.metrics.{KafkaTimer, KafkaM
 @nonthreadsafe
 class FileMessageSet private[kafka](val file: File,
                                     private[log] val channel: FileChannel,
-                                    private[log] val start: Long = 0L,
-                                    private[log] val limit: Long = Long.MaxValue,
+                                    private[log] val start: Int = 0,
+                                    private[log] val limit: Int = Int.MaxValue,
                                     initChannelPositionToEnd: Boolean = true) extends MessageSet
with Logging {
   
   /* the size of the message set in bytes */
-  private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start)
+  private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
 
   if (initChannelPositionToEnd) {
     /* set the file position to the last byte in the file */
@@ -51,7 +51,7 @@ class FileMessageSet private[kafka](val 
   /**
    * Create a file message set with no limit or offset
    */
-  def this(file: File, channel: FileChannel) = this(file, channel, 0, Long.MaxValue)
+  def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue)
   
   /**
    * Create a file message set with no limit or offset
@@ -61,7 +61,7 @@ class FileMessageSet private[kafka](val 
   /**
    * Return a message set which is a view into this set starting from the given position
and with the given size limit.
    */
-  def read(position: Long, size: Long): FileMessageSet = {
+  def read(position: Int, size: Int): FileMessageSet = {
     new FileMessageSet(file,
                        channel,
                        this.start + position,
@@ -96,8 +96,8 @@ class FileMessageSet private[kafka](val 
   /**
    * Write some of this set to the given channel, return the amount written
    */
-  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Long): Long =

-    channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel)
+  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
+    channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
   
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.
@@ -136,7 +136,7 @@ class FileMessageSet private[kafka](val 
   /**
    * The number of bytes taken up by this file set
    */
-  def sizeInBytes(): Long = _size.get()
+  def sizeInBytes(): Int = _size.get()
   
   /**
    * Append this message to the message set
@@ -175,7 +175,7 @@ class FileMessageSet private[kafka](val 
    * Truncate this file message set to the given size. Note that this API does no checking
that the 
    * given size falls on a valid byte offset.
    */
-  def truncateTo(targetSize: Long) = {
+  def truncateTo(targetSize: Int) = {
     if(targetSize > sizeInBytes())
       throw new KafkaException("Attempt to truncate log segment to %d bytes failed since
the current ".format(targetSize) +
         " size of this log segment is only %d bytes".format(sizeInBytes()))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Oct 24 16:19:26
2012
@@ -99,7 +99,7 @@ object Log {
  */
 @threadsafe
 private[kafka] class Log(val dir: File, 
-                         val maxLogFileSize: Long, 
+                         val maxLogFileSize: Int,
                          val maxMessageSize: Int, 
                          val flushInterval: Int = Int.MaxValue,
                          val rollIntervalMs: Long = Long.MaxValue, 
@@ -337,14 +337,14 @@ private[kafka] class Log(val dir: File, 
    */
   private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
     val messageSetValidBytes = messages.validBytes
-    if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
+    if(messageSetValidBytes < 0)
       throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes
+ " Message set cannot be appended to log. Possible causes are corrupted produce requests")
     if(messageSetValidBytes == messages.sizeInBytes) {
       messages
     } else {
       // trim invalid bytes
       val validByteBuffer = messages.buffer.duplicate()
-      validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+      validByteBuffer.limit(messageSetValidBytes)
       new ByteBufferMessageSet(validByteBuffer)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Wed Oct 24
16:19:26 2012
@@ -59,7 +59,7 @@ class LogSegment(val messageSet: FileMes
       // append the messages
       messageSet.append(messages)
       updateFirstAppendTime()
-      this.bytesSinceLastIndexEntry += messages.sizeInBytes.toInt
+      this.bytesSinceLastIndexEntry += messages.sizeInBytes
     }
   }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Wed Oct 24 16:19:26 2012
@@ -93,9 +93,7 @@ object ByteBufferMessageSet {
  * 
  */
 class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet with
Logging {
-  private var shallowValidByteCount = -1L
-  if(sizeInBytes > Int.MaxValue)
-    throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
+  private var shallowValidByteCount = -1
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
     this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
@@ -109,7 +107,7 @@ class ByteBufferMessageSet(@BeanProperty
     this(NoCompressionCodec, new AtomicLong(0), messages: _*)
   }
 
-  private def shallowValidBytes: Long = {
+  private def shallowValidBytes: Int = {
     if(shallowValidByteCount < 0) {
       var bytes = 0
       val iter = this.internalIterator(true)
@@ -123,10 +121,10 @@ class ByteBufferMessageSet(@BeanProperty
   }
   
   /** Write the messages in this set to the given channel */
-  def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = {
+  def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int = {
     // Ignore offset and size from input. We just want to write the whole buffer to the channel.
     buffer.mark()
-    var written = 0L
+    var written = 0
     while(written < sizeInBytes)
       written += channel.write(buffer)
     buffer.reset()
@@ -223,12 +221,12 @@ class ByteBufferMessageSet(@BeanProperty
   /**
    * The total number of bytes in this message set, including any partial trailing messages
    */
-  def sizeInBytes: Long = buffer.limit
+  def sizeInBytes: Int = buffer.limit
   
   /**
    * The total number of bytes in this message set not including any partial, trailing messages
    */
-  def validBytes: Long = shallowValidBytes
+  def validBytes: Int = shallowValidBytes
 
   /**
    * Two message sets are equal if their respective byte buffers are equal

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala Wed Oct
24 16:19:26 2012
@@ -69,7 +69,7 @@ abstract class MessageSet extends Iterab
   /** Write the messages in this set to the given channel starting at the given offset byte.

     * Less than the complete amount may be written, but no more than maxSize can be. The
number
     * of bytes written is returned */
-  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Long): Long
+  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
   
   /**
    * Provides an iterator over the message/offset pairs in this set
@@ -79,7 +79,7 @@ abstract class MessageSet extends Iterab
   /**
    * Gives the total size of this message set in bytes
    */
-  def sizeInBytes: Long
+  def sizeInBytes: Int
   
   /**
    * Validate the checksum of all the messages in the set. Throws an InvalidMessageException
if the checksum doesn't

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MessageSetSend.scala Wed
Oct 24 16:19:26 2012
@@ -31,10 +31,10 @@ import kafka.common.ErrorMapping
 @nonthreadsafe
 private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends
Send {
   
-  private var sent: Long = 0
-  private var size: Long = messages.sizeInBytes
+  private var sent: Int = 0
+  private val size: Int = messages.sizeInBytes
   private val header = ByteBuffer.allocate(6)
-  header.putInt(size.asInstanceOf[Int] + 2)
+  header.putInt(size + 2)
   header.putShort(errorCode)
   header.rewind()
   
@@ -51,7 +51,7 @@ private[server] class MessageSetSend(val
       written += channel.write(header)
     if(!header.hasRemaining) {
       val fileBytesSent = messages.writeTo(channel, sent, size - sent)
-      written += fileBytesSent.asInstanceOf[Int]
+      written += fileBytesSent
       sent += fileBytesSent
     }
 
@@ -66,6 +66,6 @@ private[server] class MessageSetSend(val
     written
   }
   
-  def sendSize: Int = size.asInstanceOf[Int] + header.capacity
+  def sendSize: Int = size + header.capacity
   
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
Wed Oct 24 16:19:26 2012
@@ -58,17 +58,17 @@ trait BaseMessageSetTestCases extends JU
   @Test
   def testSizeInBytes() {
     assertEquals("Empty message set should have 0 bytes.",
-                 0L,
+                 0,
                  createMessageSet(Array[Message]()).sizeInBytes)
     assertEquals("Predicted size should equal actual size.", 
-                 kafka.message.MessageSet.messageSetSize(messages).toLong,
+                 kafka.message.MessageSet.messageSetSize(messages),
                  createMessageSet(messages).sizeInBytes)
   }
 
   @Test
   def testSizeInBytesWithCompression () {
     assertEquals("Empty message set should have 0 bytes.",
-                 0L,           // overhead of the GZIP output stream
+                 0,           // overhead of the GZIP output stream
                  createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Wed
Oct 24 16:19:26 2012
@@ -92,7 +92,7 @@ class LogManagerTest extends JUnit3Suite
     time.currentMs += maxLogAge + 3000
     logManager.cleanupLogs()
     assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
-    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset+1, 1024).sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
@@ -111,7 +111,7 @@ class LogManagerTest extends JUnit3Suite
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
-      override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will
be 10 messages
+      override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
       override val flushInterval = 100
@@ -138,7 +138,7 @@ class LogManagerTest extends JUnit3Suite
     // this cleanup shouldn't find any expired segments but should delete some to reduce
size
     logManager.cleanupLogs()
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
-    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset + 1, 1024).sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Wed Oct
24 16:19:26 2012
@@ -93,7 +93,7 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
-    val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be
10 messages
+    val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
     val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery
= false, time = time)
@@ -132,7 +132,7 @@ class LogTest extends JUnitSuite {
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
     val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
-    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024,
1000).sizeInBytes)
+    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024,
1000).sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Expected exception on invalid read.")
@@ -250,7 +250,7 @@ class LogTest extends JUnitSuite {
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
-    val log = new Log(logDir, 100, maxMessageSize.toInt, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
+    val log = new Log(logDir, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L,
needsRecovery = false, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -297,7 +297,7 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
-    val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be
10 messages
+    val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
     // create a log
     val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery
= false, time = time)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
Wed Oct 24 16:19:26 2012
@@ -46,10 +46,10 @@ trait BaseMessageSetTestCases extends JU
   @Test
   def testSizeInBytes() {
     assertEquals("Empty message set should have 0 bytes.",
-                 0L,
+                 0,
                  createMessageSet(Array[Message]()).sizeInBytes)
     assertEquals("Predicted size should equal actual size.", 
-                 MessageSet.messageSetSize(messages).toLong, 
+                 MessageSet.messageSetSize(messages),
                  createMessageSet(messages).sizeInBytes)
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1401760&r1=1401759&r2=1401760&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
Wed Oct 24 16:19:26 2012
@@ -32,7 +32,7 @@ class ByteBufferMessageSetTest extends B
   def testValidBytes() {
     {
       val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()),
new Message("there".getBytes()))
-      val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+      val buffer = ByteBuffer.allocate(messages.sizeInBytes + 2)
       buffer.put(messages.buffer)
       buffer.putShort(4)
       val messagesPlus = new ByteBufferMessageSet(buffer)
@@ -50,7 +50,7 @@ class ByteBufferMessageSetTest extends B
   def testValidBytesWithCompression() {
     {
       val messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()),
new Message("there".getBytes()))
-      val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+      val buffer = ByteBuffer.allocate(messages.sizeInBytes + 2)
       buffer.put(messages.buffer)
       buffer.putShort(4)
       val messagesPlus = new ByteBufferMessageSet(buffer)



Mime
View raw message