Author: nehanarkhede Date: Fri Aug 19 00:51:20 2011 New Revision: 1159461 URL: http://svn.apache.org/viewvc?rev=1159461&view=rev Log: Some new unit tests for ByteBufferMessageSet iterator KAFKA-108; patched by Jun; reviewed by Neha Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1159461&r1=1159460&r2=1159461&view=diff ============================================================================== --- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original) +++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Fri Aug 19 00:51:20 2011 @@ -20,6 +20,7 @@ package kafka.message import java.nio._ import junit.framework.Assert._ import org.junit.Test +import kafka.utils.TestUtils class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @@ -49,4 +50,68 @@ class ByteBufferMessageSetTest extends B assertTrue(messages.equals(moreMessages)) } + + @Test + def testIterator() { + val messageList = List( + new Message("msg1".getBytes), + new Message("msg2".getBytes), + new Message("msg3".getBytes) + ) + + // test for uncompressed regular messages + { + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*) + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + //make sure ByteBufferMessageSet is re-iterable. + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + //make sure the last offset after iteration is correct + assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit) + } + + // test for compressed regular messages + { + val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + //make sure ByteBufferMessageSet is re-iterable. + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + //make sure the last offset after iteration is correct + assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit) + } + + // test for mixed empty and non-empty messagesets uncompressed + { + val emptyMessageList : List[Message] = Nil + val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*) + val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*) + val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit) + buffer.put(emptyMessageSet.serialized) + buffer.put(regularMessgeSet.serialized) + buffer.rewind + val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) + //make sure ByteBufferMessageSet is re-iterable. + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) + //make sure the last offset after iteration is correct + assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) + } + + // test for mixed empty and non-empty messagesets compressed + { + val emptyMessageList : List[Message] = Nil + val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*) + val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) + val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit) + buffer.put(emptyMessageSet.serialized) + buffer.put(regularMessgeSet.serialized) + buffer.rewind + val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) + //make sure ByteBufferMessageSet is re-iterable. + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) + //make sure the last offset after iteration is correct + assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) + } + } + } Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1159461&r1=1159460&r2=1159461&view=diff ============================================================================== --- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original) +++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Aug 19 00:51:20 2011 @@ -288,6 +288,18 @@ object TestUtils { ZkUtils.updatePersistentPath(zkClient, path, offset.toString) } + + def getMessageIterator(iter: Iterator[MessageAndOffset]): Iterator[Message] = { + new IteratorTemplate[Message] { + override def makeNext(): Message = { + if (iter.hasNext) + return iter.next.message + else + return allDone() + } + } + } + } object TestZKUtils {