kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1395729 [3/4] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/...
Date Mon, 08 Oct 2012 19:13:27 GMT
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import org.junit._
+import org.scalatest.junit.JUnitSuite
+import junit.framework.Assert._
+import java.nio.ByteBuffer
+import kafka.api._
+import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.cluster.Broker
+import collection.mutable._
+import kafka.common.{TopicAndPartition, ErrorMapping}
+
+
+object SerializationTestUtils{
+  private val topic1 = "test1"
+  private val topic2 = "test2"
+  private val leader1 = 0
+  private val isr1 = List(0, 1, 2)
+  private val leader2 = 0
+  private val isr2 = List(0, 2, 3)
+  private val partitionDataFetchResponse0 = new FetchResponsePartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes)))
+  private val partitionDataFetchResponse1 = new FetchResponsePartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes)))
+  private val partitionDataFetchResponse2 = new FetchResponsePartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes)))
+  private val partitionDataFetchResponse3 = new FetchResponsePartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes)))
+  private val partitionDataFetchResponseArray = Array(partitionDataFetchResponse0, partitionDataFetchResponse1, partitionDataFetchResponse2, partitionDataFetchResponse3)
+
+  private val topicDataFetchResponse = {
+    val groupedData = Array(topic1, topic2).flatMap(topic =>
+      partitionDataFetchResponseArray.map(partitionData =>
+        (TopicAndPartition(topic, partitionData.partition), partitionData)))
+    collection.immutable.Map(groupedData:_*)
+  }
+
+  private val partitionDataProducerRequest0 = new ProducerRequestPartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes)))
+  private val partitionDataProducerRequest1 = new ProducerRequestPartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes)))
+  private val partitionDataProducerRequest2 = new ProducerRequestPartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes)))
+  private val partitionDataProducerRequest3 = new ProducerRequestPartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes)))
+  private val partitionDataProducerRequestArray = Array(partitionDataProducerRequest0, partitionDataProducerRequest1, partitionDataProducerRequest2, partitionDataProducerRequest3)
+
+  private val topicDataProducerRequest = {
+    val groupedData = Array(topic1, topic2).flatMap(topic =>
+      partitionDataProducerRequestArray.map(partitionData =>
+        (TopicAndPartition(topic, partitionData.partition), partitionData)))
+    collection.immutable.Map(groupedData:_*)
+  }
+
+  private val requestInfos = collection.immutable.Map(
+    TopicAndPartition(topic1, 0) -> PartitionFetchInfo(1000, 100),
+    TopicAndPartition(topic1, 1) -> PartitionFetchInfo(2000, 100),
+    TopicAndPartition(topic1, 2) -> PartitionFetchInfo(3000, 100),
+    TopicAndPartition(topic1, 3) -> PartitionFetchInfo(4000, 100),
+    TopicAndPartition(topic2, 0) -> PartitionFetchInfo(1000, 100),
+    TopicAndPartition(topic2, 1) -> PartitionFetchInfo(2000, 100),
+    TopicAndPartition(topic2, 2) -> PartitionFetchInfo(3000, 100),
+    TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
+  )
+
+  private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
+  private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
+  private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
+
+  def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = {
+    val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1)
+    val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
+    val map = Map(((topic1, 0), leaderAndISR1),
+                  ((topic2, 0), leaderAndISR2))
+    new LeaderAndIsrRequest(map)
+  }
+
+  def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
+    val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
+                          ((topic2, 0), ErrorMapping.NoError))
+    new LeaderAndISRResponse(1, responseMap)
+  }
+
+  def createTestStopReplicaRequest() : StopReplicaRequest = {
+    new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
+  }
+
+  def createTestStopReplicaResponse() : StopReplicaResponse = {
+    val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
+                          ((topic2, 0), ErrorMapping.NoError))
+    new StopReplicaResponse(1, responseMap)
+  }
+
+  def createTestProducerRequest: ProducerRequest = {
+    new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
+  }
+
+  def createTestProducerResponse: ProducerResponse =
+    ProducerResponse(1, 1, Map(
+      TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
+      TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
+    ))
+
+  def createTestFetchRequest: FetchRequest = {
+    new FetchRequest(requestInfo = requestInfos)
+  }
+
+  def createTestFetchResponse: FetchResponse = {
+    FetchResponse(1, 1, topicDataFetchResponse)
+  }
+
+  def createTestOffsetRequest = new OffsetRequest(
+      collection.immutable.Map(TopicAndPartition(topic1, 1) -> PartitionOffsetRequestInfo(1000, 200)),
+      replicaId = 0
+  )
+
+  def createTestOffsetResponse: OffsetResponse = {
+    new OffsetResponse(OffsetRequest.CurrentVersion, collection.immutable.Map(
+      TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l)))
+    )
+  }
+
+  def createTestTopicMetadataRequest: TopicMetadataRequest = {
+    new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2))
+  }
+
+  def createTestTopicMetadataResponse: TopicMetadataResponse = {
+    new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2))
+  }
+}
+
+class RequestResponseSerializationTest extends JUnitSuite {
+  private val leaderAndISRRequest = SerializationTestUtils.createTestLeaderAndISRRequest
+  private val leaderAndISRResponse = SerializationTestUtils.createTestLeaderAndISRResponse
+  private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest
+  private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse
+  private val producerRequest = SerializationTestUtils.createTestProducerRequest
+  private val producerResponse = SerializationTestUtils.createTestProducerResponse
+  private val fetchRequest = SerializationTestUtils.createTestFetchRequest
+  private val offsetRequest = SerializationTestUtils.createTestOffsetRequest
+  private val offsetResponse = SerializationTestUtils.createTestOffsetResponse
+  private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest
+  private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse
+
+
+  @Test
+  def testSerializationAndDeserialization() {
+    var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes())
+    leaderAndISRRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest,
+                 deserializedLeaderAndISRRequest)
+
+    buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes())
+    leaderAndISRResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse,
+                 deserializedLeaderAndISRResponse)
+
+    buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes())
+    stopReplicaRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed stopReplicaRequest should be the same", stopReplicaRequest,
+                 deserializedStopReplicaRequest)
+
+    buffer = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes())
+    stopReplicaResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse,
+                 deserializedStopReplicaResponse)
+
+    buffer = ByteBuffer.allocate(producerRequest.sizeInBytes)
+    producerRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedProducerRequest = ProducerRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed producerRequest should be the same", producerRequest,
+                 deserializedProducerRequest)
+
+    buffer = ByteBuffer.allocate(producerResponse.sizeInBytes)
+    producerResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedProducerResponse = ProducerResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed producerResponse should be the same: [%s], [%s]".format(producerResponse, deserializedProducerResponse), producerResponse,
+                 deserializedProducerResponse)
+
+    buffer = ByteBuffer.allocate(fetchRequest.sizeInBytes)
+    fetchRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedFetchRequest = FetchRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest,
+                 deserializedFetchRequest)
+
+    buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes)
+    offsetRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedOffsetRequest = OffsetRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed offsetRequest should be the same", offsetRequest,
+                 deserializedOffsetRequest)
+
+    buffer = ByteBuffer.allocate(offsetResponse.sizeInBytes)
+    offsetResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedOffsetResponse = OffsetResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed offsetResponse should be the same", offsetResponse,
+                 deserializedOffsetResponse)
+
+    buffer = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes())
+    topicMetadataRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedTopicMetadataRequest = TopicMetadataRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed topicMetadataRequest should be the same", topicMetadataRequest,
+                 deserializedTopicMetadataRequest)
+
+    buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes)
+    topicMetadataResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
+                 deserializedTopicMetadataResponse)
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Mon Oct  8 19:13:24 2012
@@ -99,8 +99,7 @@ class ZookeeperConsumerConnectorTest ext
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     // create a consumer
-    val consumerConfig1 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
 
@@ -348,7 +347,7 @@ class ZookeeperConsumerConnectorTest ext
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
     assertEquals(nMessages, receivedMessages1.size)
-    assertEquals(sentMessages1, receivedMessages1)
+    assertEquals(sentMessages1.sortWith((s,t) => s.checksum < t.checksum), receivedMessages1)
   }
 
   def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[Message] = {
@@ -395,10 +394,10 @@ class ZookeeperConsumerConnectorTest ext
 
   def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= {
     var messages: List[Message] = Nil
-    for ((topic, messageStreams) <- topicMessageStreams) {
+    for((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator
-        for (i <- 0 until nMessagesPerThread) {
+        for(i <- 0 until nMessagesPerThread) {
           assertTrue(iterator.hasNext)
           val message = iterator.next.message
           messages ::= message

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Mon Oct  8 19:13:24 2012
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.consumer
+package kafka.integration
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
@@ -26,7 +26,7 @@ import kafka.cluster._
 import kafka.message._
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
-import kafka.integration.KafkaServerTestHarness
+import kafka.consumer._
 import kafka.producer.{ProducerData, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Oct  8 19:13:24 2012
@@ -103,7 +103,7 @@ class LazyInitProducerTest extends JUnit
 
     // send some invalid offsets
     val builder = new FetchRequestBuilder()
-    for( (topic, offset) <- topicOffsets )
+    for((topic, offset) <- topicOffsets)
       builder.addFetch(topic, offset, -1, 10000)
 
     val request = builder.build()
@@ -113,8 +113,7 @@ class LazyInitProducerTest extends JUnit
         ErrorMapping.maybeThrowException(pd.error)
         fail("Expected an OffsetOutOfRangeException exception to be thrown")
       } catch {
-        case e: OffsetOutOfRangeException =>
-
+        case e: OffsetOutOfRangeException => // this is good
       }
     })
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Mon Oct  8 19:13:24 2012
@@ -22,8 +22,9 @@ import org.junit.Test
 import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
 
 class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestCases {
+  
   override def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet =
-    new ByteBufferMessageSet(new kafka.message.ByteBufferMessageSet(compressed, messages: _*))
+    new ByteBufferMessageSet(new kafka.message.ByteBufferMessageSet(compressed, messages: _*).buffer)
 
   val msgSeq: Seq[Message] = Seq(new Message("hello".getBytes()), new Message("there".getBytes()))
 

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.nio._
+import java.util.concurrent.atomic._
+import junit.framework.Assert._
+import kafka.utils.TestUtils._
+import kafka.message._
+import org.junit.Test
+
+class FileMessageSetTest extends BaseMessageSetTestCases {
+  
+  val messageSet = createMessageSet(messages)
+  
+  def createMessageSet(messages: Seq[Message]): FileMessageSet = {
+    val set = new FileMessageSet(tempFile(), true)
+    set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
+    set.flush()
+    set
+  }
+
+  @Test
+  def testFileSize() {
+    assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+    messageSet.append(singleMessageSet("abcd".getBytes()))
+    assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+  }
+  
+  @Test
+  def testIterationOverPartialAndTruncation() {
+    testPartialWrite(0, messageSet)
+    testPartialWrite(2, messageSet)
+    testPartialWrite(4, messageSet)
+    testPartialWrite(5, messageSet)
+    testPartialWrite(6, messageSet)
+  }
+  
+  def testPartialWrite(size: Int, messageSet: FileMessageSet) {
+    val buffer = ByteBuffer.allocate(size)
+    val originalPosition = messageSet.channel.position
+    for(i <- 0 until size)
+      buffer.put(0.asInstanceOf[Byte])
+    buffer.rewind()
+    messageSet.channel.write(buffer)
+    // appending those bytes should not change the contents
+    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+  }
+  
+  @Test
+  def testIterationDoesntChangePosition() {
+    val position = messageSet.channel.position
+    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+    assertEquals(position, messageSet.channel.position)
+  }
+  
+  @Test
+  def testRead() {
+    val read = messageSet.read(0, messageSet.sizeInBytes)
+    checkEquals(messageSet.iterator, read.iterator)
+    val items = read.iterator.toList
+    val sec = items.tail.head
+    val read2 = messageSet.read(MessageSet.entrySize(sec.message), messageSet.sizeInBytes)
+    checkEquals(items.tail.iterator, read2.iterator)
+  }
+  
+  @Test
+  def testSearch() {
+    // append a new message with a high offset
+    val lastMessage = new Message("test".getBytes)
+    messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage))
+    var physicalOffset = 0
+    assertEquals("Should be able to find the first message by its offset", 
+                 OffsetPosition(0L, physicalOffset), 
+                 messageSet.searchFor(0, 0))
+    physicalOffset += MessageSet.entrySize(messageSet.head.message)
+    assertEquals("Should be able to find second message when starting from 0", 
+                 OffsetPosition(1L, physicalOffset), 
+                 messageSet.searchFor(1, 0))
+    assertEquals("Should be able to find second message starting from its offset", 
+                 OffsetPosition(1L, physicalOffset), 
+                 messageSet.searchFor(1, physicalOffset))
+    physicalOffset += MessageSet.entrySize(messageSet.tail.head.message)
+    assertEquals("Should be able to find third message from a non-existant offset", 
+                 OffsetPosition(50L, physicalOffset), 
+                 messageSet.searchFor(3, physicalOffset))
+    assertEquals("Should be able to find third message by correct offset", 
+                 OffsetPosition(50L, physicalOffset), 
+                 messageSet.searchFor(50, physicalOffset))
+  }
+  
+}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Mon Oct  8 19:13:24 2012
@@ -27,7 +27,7 @@ import kafka.admin.CreateTopicCommand
 import kafka.server.KafkaConfig
 import kafka.utils._
 
-class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
+class LogManagerTest extends JUnit3Suite {
 
   val time: MockTime = new MockTime()
   val maxRollInterval = 100
@@ -35,15 +35,13 @@ class LogManagerTest extends JUnit3Suite
   var logDir: File = null
   var logManager: LogManager = null
   var config:KafkaConfig = null
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
   val scheduler = new KafkaScheduler(2)
 
   override def setUp() {
     super.setUp()
-    val props = TestUtils.createBrokerConfig(0, -1)
-    config = new KafkaConfig(props) {
+    config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
                    override val logFileSize = 1024
                    override val flushInterval = 100
                  }
@@ -51,11 +49,6 @@ class LogManagerTest extends JUnit3Suite
     logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
-
-    TestUtils.createBrokersInZk(zkClient, List(config.brokerId))
-
-    // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zkClient, name, 3, 1, "0,0,0")
   }
 
   override def tearDown() {
@@ -87,8 +80,8 @@ class LogManagerTest extends JUnit3Suite
     var offset = 0L
     for(i <- 0 until 1000) {
       var set = TestUtils.singleMessageSet("test".getBytes())
-      log.append(set)
-      offset += set.sizeInBytes
+      val (start, end) = log.append(set)
+      offset = end
     }
     log.flush
 
@@ -96,12 +89,12 @@ class LogManagerTest extends JUnit3Suite
 
     // update the last modified time of all log segments
     val logSegments = log.segments.view
-    logSegments.foreach(s => s.file.setLastModified(time.currentMs))
+    logSegments.foreach(s => s.messageSet.file.setLastModified(time.currentMs))
 
     time.currentMs += maxLogAge + 3000
     logManager.cleanupLogs()
     assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
-    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset+1, 1024).sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
@@ -135,8 +128,8 @@ class LogManagerTest extends JUnit3Suite
     // add a bunch of messages that should be larger than the retentionSize
     for(i <- 0 until 1000) {
       val set = TestUtils.singleMessageSet("test".getBytes())
-      log.append(set)
-      offset += set.sizeInBytes
+      val (start, end) = log.append(set)
+      offset = start
     }
     // flush to make sure it's written to disk
     log.flush
@@ -147,7 +140,7 @@ class LogManagerTest extends JUnit3Suite
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
     logManager.cleanupLogs()
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
-    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset + 1, 1024).sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
@@ -175,8 +168,8 @@ class LogManagerTest extends JUnit3Suite
       var set = TestUtils.singleMessageSet("test".getBytes())
       log.append(set)
     }
-
+    println("now = " + System.currentTimeMillis + " last flush = " + log.getLastFlushedTime)
     assertTrue("The last flush time has to be within defaultflushInterval of current time ",
-                     (System.currentTimeMillis - log.getLastFlushedTime) < 100)
+                     (System.currentTimeMillis - log.getLastFlushedTime) < 150)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Mon Oct  8 19:13:24 2012
@@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite 
     log.flush()
 
     val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10)
-    assertEquals(Seq(240L, 216L, 108L, 0L), offsets)
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
     val topicAndPartition = TopicAndPartition(topic, part)
@@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite 
       replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets)
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
 
     // try to fetch using latest offset
     val fetchResponse = simpleConsumer.fetch(
@@ -157,14 +157,14 @@ class LogOffsetTest extends JUnit3Suite 
     val now = time.milliseconds
 
     val offsets = log.getOffsetsBefore(now, 10)
-    assertEquals(Seq(240L, 216L, 108L, 0L), offsets)
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
     val topicAndPartition = TopicAndPartition(topic, part)
     val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets)
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
   }
 
   @Test

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,105 @@
+package kafka.log
+
+import junit.framework.Assert._
+import java.util.concurrent.atomic._
+import org.junit.{Test, Before, After}
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.TestUtils
+import kafka.message._
+import kafka.utils.SystemTime
+import scala.collection._
+
+class LogSegmentTest extends JUnit3Suite {
+  
+  val segments = mutable.ArrayBuffer[LogSegment]()
+  
+  def createSegment(offset: Long): LogSegment = {
+    val msFile = TestUtils.tempFile()
+    val ms = new FileMessageSet(msFile, true)
+    val idxFile = TestUtils.tempFile()
+    idxFile.delete()
+    val idx = new OffsetIndex(idxFile, offset, true, 100)
+    val seg = new LogSegment(ms, idx, offset, 10, SystemTime)
+    segments += seg
+    seg
+  }
+  
+  def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
+    new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, 
+                             offsetCounter = new AtomicLong(offset), 
+                             messages = messages.map(s => new Message(s.getBytes)):_*)
+  }
+  
+  @After
+  def teardown() {
+    for(seg <- segments) {
+      seg.index.delete()
+      seg.messageSet.delete()
+    }
+  }
+  
+  @Test
+  def testReadOnEmptySegment() {
+    val seg = createSegment(40)
+    val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None)
+    assertEquals(0, read.size)
+  }
+  
+  @Test
+  def testReadBeforeFirstOffset() {
+    val seg = createSegment(40)
+    val ms = messages(50, "hello", "there", "little", "bee")
+    seg.append(50, ms)
+    val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None)
+    assertEquals(ms.toList, read.toList)
+  }
+  
+  @Test
+  def testReadSingleMessage() {
+    val seg = createSegment(40)
+    val ms = messages(50, "hello", "there")
+    seg.append(50, ms)
+    val read = seg.read(startOffset = 41, maxSize = 200, maxOffset = Some(50))
+    assertEquals(new Message("hello".getBytes), read.head.message)
+  }
+  
+  @Test
+  def testReadAfterLast() {
+    val seg = createSegment(40)
+    val ms = messages(50, "hello", "there")
+    seg.append(50, ms)
+    val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
+    assertEquals(0, read.size)
+  }
+  
+  @Test
+  def testReadFromGap() {
+    val seg = createSegment(40)
+    val ms = messages(50, "hello", "there")
+    seg.append(50, ms)
+    val ms2 = messages(60, "alpha", "beta")
+    seg.append(60, ms2)
+    val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    assertEquals(ms2.toList, read.toList)
+  }
+  
+  @Test
+  def testTruncate() {
+    val seg = createSegment(40)
+    val ms = messages(50, "hello", "there", "you")
+    seg.append(50, ms)
+    seg.truncateTo(51)
+    val read = seg.read(50, maxSize = 1000, None)
+    assertEquals(1, read.size)
+    assertEquals(ms.head, read.head)
+  }
+  
+  @Test
+  def testNextOffsetCalculation() {
+    val seg = createSegment(40)
+    assertEquals(40, seg.nextOffset)
+    seg.append(50, messages(50, "hello", "there", "you"))
+    assertEquals(53, seg.nextOffset())
+  }
+  
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Mon Oct  8 19:13:24 2012
@@ -22,7 +22,7 @@ import java.util.ArrayList
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.message._
 import kafka.common.{MessageSizeTooLargeException, KafkaException, OffsetOutOfRangeException}
 import kafka.utils._
 import scala.Some
@@ -46,9 +46,11 @@ class LogTest extends JUnitSuite {
     Utils.rm(logDir)
   }
   
-  def createEmptyLogs(dir: File, offsets: Int*) = {
-    for(offset <- offsets)
-      new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
+  def createEmptyLogs(dir: File, offsets: Int*) {
+    for(offset <- offsets) {
+      Log.logFilename(dir, offset).createNewFile()
+      Log.indexFilename(dir, offset).createNewFile()
+    }
   }
 
   /** Test that the size and time based log segment rollout works. */
@@ -59,7 +61,7 @@ class LogTest extends JUnitSuite {
     val time: MockTime = new MockTime()
 
     // create a log
-    val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, false, time)
+    val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
     time.currentMs += rollMs + 1
 
     // segment age is less than its limit
@@ -76,12 +78,12 @@ class LogTest extends JUnitSuite {
 
     time.currentMs += rollMs + 1
     val blank = Array[Message]()
-    log.append(new ByteBufferMessageSet(blank:_*))
+    log.append(new ByteBufferMessageSet(new Message("blah".getBytes)))
     assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
 
     time.currentMs += rollMs + 1
     // the last segment expired in age, but was blank. So new segment should not be generated
-    log.append(set)
+    log.append(new ByteBufferMessageSet())
     assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
   }
 
@@ -93,7 +95,7 @@ class LogTest extends JUnitSuite {
     val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time)
+    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -106,23 +108,12 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
-  }
-
-  @Test
-  def testLoadInvalidLogsFails() {
-    createEmptyLogs(logDir, 0, 15)
-    try {
-      new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
-      fail("Allowed load of corrupt logs without complaint.")
-    } catch {
-      case e: KafkaException => "This is good"
-    }
+    new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
   }
 
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -139,7 +130,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -159,95 +150,96 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
+    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
     val numMessages = 100
-    for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
+    val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
+    val offsets = messageSets.map(log.append(_)._1)
     log.flush
 
-    /* now do successive reads and iterate over the resulting message sets counting the messages
-     * we should find exact 100 messages.
-     */
-    var reads = 0
-    var current = 0
+    /* do successive reads to ensure all our messages are there */
     var offset = 0L
-    var readOffset = 0L
-    while(current < numMessages) {
-      val messages = log.read(readOffset, 1024*1024)
-      readOffset += messages.last.offset
-      current += messages.size
-      if(reads > 2*numMessages)
-        fail("Too many read attempts.")
-      reads += 1
+    for(i <- 0 until numMessages) {
+      val messages = log.read(offset, 1024*1024)
+      assertEquals("Offsets not equal", offset, messages.head.offset)
+      assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message)
+      offset = messages.head.offset + 1
     }
-    assertEquals("We did not find all the messages we put in", numMessages, current)
+    val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1))
+    assertEquals("Should be no more messages", 0, lastRead.size)
+  }
+  
+  /** Test the case where we have compressed batches of messages */
+  @Test
+  def testCompressedMessages() {
+    /* this log should roll after every messageset */
+    val log = new Log(logDir, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    
+    /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
+    log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
+    log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
+    
+    def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message)
+    
+    /* we should always get the first message in the compressed set when reading any offset in the set */
+    assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
+    assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset)
+    assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
+    assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
   }
 
   @Test
   def testFindSegment() {
     assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
-    assertEquals("Search in segment list just outside the range of the last segment should find nothing",
-                 None, Log.findRange(makeRanges(5, 9, 12), 12))
-    try {
-      Log.findRange(makeRanges(35), 36)
-      fail("expect exception")
-    }
-    catch {
-      case e: OffsetOutOfRangeException => "this is good"
-    }
-
-    try {
-      Log.findRange(makeRanges(35,35), 36)
-    }
-    catch {
-      case e: OffsetOutOfRangeException => "this is good"
-    }
-
+    assertEquals("Search in segment list just outside the range of the last segment should find last segment",
+                 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start)
+    assertEquals("Search in segment list far outside the range of the last segment should find last segment",
+                 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start)
+    assertEquals("Search in segment list far outside the range of the last segment should find last segment",
+                 None, Log.findRange(makeRanges(5, 9, 12), -1))
     assertContains(makeRanges(5, 9, 12), 11)
     assertContains(makeRanges(5), 4)
     assertContains(makeRanges(5,8), 5)
     assertContains(makeRanges(5,8), 6)
   }
   
-  /** Test corner cases of rolling logs */
   @Test
-  def testEdgeLogRolls() {
-    {
-      // first test a log segment starting at 0
-      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
-      val curOffset = log.logEndOffset
-      assertEquals(curOffset, 0)
+  def testEdgeLogRollsStartingAtZero() {
+    // first test a log segment starting at 0
+    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val curOffset = log.logEndOffset
+    assertEquals(curOffset, 0)
 
-      // time goes by; the log file is deleted
-      log.markDeletedWhile(_ => true)
+    // time goes by; the log file is deleted
+    log.markDeletedWhile(_ => true)
 
-      // we now have a new log; the starting offset of the new log should remain 0
-      assertEquals(curOffset, log.logEndOffset)
-    }
+    // we now have a new log; the starting offset of the new log should remain 0
+    assertEquals(curOffset, log.logEndOffset)
+    log.delete()
+  }
 
-    {
-      // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
-      val numMessages = 1
-      for(i <- 0 until numMessages)
-        log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
-
-      val curOffset = log.logEndOffset
-      // time goes by; the log file is deleted
-      log.markDeletedWhile(_ => true)
-
-      // we now have a new log
-      assertEquals(curOffset, log.logEndOffset)
+  @Test
+  def testEdgeLogRollsStartingAtNonZero() {
+    // second test an empty log segment starting at non-zero
+    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val numMessages = 1
+    for(i <- 0 until numMessages)
+      log.append(TestUtils.singleMessageSet(i.toString.getBytes))
+    val curOffset = log.logEndOffset
+    
+    // time goes by; the log file is deleted
+    log.markDeletedWhile(_ => true)
+
+    // we now have a new log
+    assertEquals(curOffset, log.logEndOffset)
 
-      // time goes by; the log file (which is empty) is deleted again
-      val deletedSegments = log.markDeletedWhile(_ => true)
+    // time goes by; the log file (which is empty) is deleted again
+    val deletedSegments = log.markDeletedWhile(_ => true)
 
-      // we shouldn't delete the last empty log segment.
-      assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
+    // we shouldn't delete the last empty log segment.
+    assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
 
-      // we now have a new log
-      assertEquals(curOffset, log.logEndOffset)
-    }
+    // we now have a new log
+    assertEquals(curOffset, log.logEndOffset)
   }
 
   @Test
@@ -256,27 +248,47 @@ class LogTest extends JUnitSuite {
     val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
 
     // append messages to log
-    val log = new Log(logDir, 100, 5, 1000, config.logRollHours*60*60*1000L, false, time)
+    val maxMessageSize = second.sizeInBytes - 1
+    val log = new Log(logDir, 100, maxMessageSize.toInt, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 
-    var ret =
-      try {
-        log.append(first)
-        true
-      }
-      catch {
-        case e: MessageSizeTooLargeException => false
-      }
-    assert(ret, "First messageset should pass.")
-
-    ret =
-      try {
-        log.append(second)
-        false
-      }
-      catch {
-        case e:MessageSizeTooLargeException => true
-      }
-    assert(ret, "Second message set should throw MessageSizeTooLargeException.")
+    // should be able to append the small message
+    log.append(first)
+
+    try {
+      log.append(second)
+      fail("Second message set should throw MessageSizeTooLargeException.")
+    } catch {
+        case e:MessageSizeTooLargeException => // this is good
+    }
+  }
+  
+  @Test
+  def testLogRecoversToCorrectOffset() {
+    val numMessages = 100
+    val messageSize = 100
+    val segmentSize = 7 * messageSize
+    val indexInterval = 3 * messageSize
+    var log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    for(i <- 0 until numMessages)
+      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
+    assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
+    val lastIndexOffset = log.segments.view.last.index.lastOffset
+    val numIndexEntries = log.segments.view.last.index.entries
+    log.close()
+    
+    // test non-recovery case
+    log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
+    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
+    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+    log.close()
+    
+    // test 
+    log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+    assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
+    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
+    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+    log.close()
   }
 
   @Test
@@ -287,14 +299,15 @@ class LogTest extends JUnitSuite {
     val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time)
+    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
-    for (i<- 1 to msgPerSeg) {
+    for (i<- 1 to msgPerSeg)
       log.append(set)
-    }
+    
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
-
+    assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
+    
     val lastOffset = log.logEndOffset
     val size = log.size
     log.truncateTo(log.logEndOffset) // keep the entire log
@@ -303,29 +316,29 @@ class LogTest extends JUnitSuite {
     log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset
     assertEquals("Should not change offset but should log error", lastOffset, log.logEndOffset)
     assertEquals("Should not change log size", size, log.size)
-    log.truncateTo(log.logEndOffset - 10) // truncate somewhere in between
-    assertEquals("Should change offset", lastOffset, log.logEndOffset + 10)
-    assertEquals("Should change log size", size, log.size + 10)
-    log.truncateTo(log.logEndOffset - log.size) // truncate the entire log
-    assertEquals("Should change offset", log.logEndOffset, lastOffset - size)
-    assertEquals("Should change log size", log.size, 0)
+    log.truncateTo(msgPerSeg/2) // truncate somewhere in between
+    assertEquals("Should change offset", log.logEndOffset, msgPerSeg/2)
+    assertTrue("Should change log size", log.size < size)
+    log.truncateTo(0) // truncate the entire log
+    assertEquals("Should change offset", 0, log.logEndOffset)
+    assertEquals("Should change log size", 0, log.size)
 
-    for (i<- 1 to msgPerSeg) {
+    for (i<- 1 to msgPerSeg)
       log.append(set)
-    }
+    
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
     assertEquals("Should be back to original size", log.size, size)
-    log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)*setSize)
-    assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1)*setSize)
+    log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
+    assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
     assertEquals("Should change log size", log.size, 0)
 
-    for (i<- 1 to msgPerSeg) {
+    for (i<- 1 to msgPerSeg)
       log.append(set)
-    }
-    assertEquals("Should be ahead of to original offset", log.logEndOffset, lastOffset + setSize)
+
+    assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
     assertEquals("log size should be same as before", size, log.size)
-    log.truncateTo(log.logEndOffset - log.size - setSize) // truncate before first start offset in the log
-    assertEquals("Should change offset", log.logEndOffset, lastOffset - size)
+    log.truncateTo(0) // truncate before first start offset in the log
+    assertEquals("Should change offset", 0, log.logEndOffset)
     assertEquals("Should change log size", log.size, 0)
   }
 

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import junit.framework.Assert._
+import java.util.{Collections, Arrays}
+import org.junit._
+import org.scalatest.junit.JUnitSuite
+import scala.collection._
+import scala.util.Random
+import kafka.utils._
+
+class OffsetIndexTest extends JUnitSuite {
+  
+  var idx: OffsetIndex = null
+  val maxEntries = 30
+  
+  @Before
+  def setup() {
+    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, mutable = true, maxIndexSize = 30 * 8)
+  }
+  
+  @After
+  def teardown() {
+    if(this.idx != null)
+      this.idx.file.delete()
+  }
+
+  @Test
+  def randomLookupTest() {
+    assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L))
+    
+    // append some random values
+    val base = idx.baseOffset.toInt + 1
+    val size = idx.maxEntries
+    val vals: Seq[(Long, Int)] = monotonicSeq(base, size).map(_.toLong).zip(monotonicSeq(0, size))
+    vals.foreach{x => idx.append(x._1, x._2)}
+    
+    // should be able to find all those values
+    for((logical, physical) <- vals)
+      assertEquals("Should be able to find values that are present.", OffsetPosition(logical, physical), idx.lookup(logical))
+      
+    // for non-present values we should find the offset of the largest value less than or equal to this 
+    val valMap = new immutable.TreeMap[Long, (Long, Int)]() ++ vals.map(p => (p._1, p))
+    val offsets = (idx.baseOffset until vals.last._1.toInt).toArray
+    Collections.shuffle(Arrays.asList(offsets))
+    for(offset <- offsets.take(30)) {
+      val rightAnswer = 
+        if(offset < valMap.firstKey)
+          OffsetPosition(idx.baseOffset, 0)
+        else
+          OffsetPosition(valMap.to(offset).last._1, valMap.to(offset).last._2._2)
+      assertEquals("The index should give the same answer as the sorted map", rightAnswer, idx.lookup(offset))
+    }
+  }
+  
+  @Test
+  def lookupExtremeCases() {
+    assertEquals("Lookup on empty file", OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset))
+    for(i <- 0 until idx.maxEntries)
+      idx.append(idx.baseOffset + i + 1, i)
+    // check first and last entry
+    assertEquals(OffsetPosition(idx.baseOffset, 0), idx.lookup(idx.baseOffset))
+    assertEquals(OffsetPosition(idx.baseOffset + idx.maxEntries, idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries))
+  }
+  
+  @Test
+  def appendTooMany() {
+    for(i <- 0 until idx.maxEntries) {
+      val offset = idx.baseOffset + i + 1
+      idx.append(offset, i)
+    }
+    assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException])
+  }
+
+  
+  @Test
+  def testReadOnly() {
+    /* add some random values */
+    val vals = List((49, 1), (52, 2), (55, 3))
+    for((logical, physical) <- vals)
+      idx.append(logical, physical)
+    
+    idx.makeReadOnly()
+    
+    assertEquals("File length should just contain added entries.", vals.size * 8L, idx.file.length())
+    assertEquals("Last offset field should be initialized", vals.last._1, idx.lastOffset)
+    
+    for((logical, physical) <- vals)
+    	assertEquals("Should still be able to find everything.", OffsetPosition(logical, physical), idx.lookup(logical))
+    	
+    assertWriteFails("Append should fail on read-only index", idx, 60, classOf[IllegalStateException])
+  }
+  
+  @Test(expected = classOf[IllegalArgumentException])
+  def appendOutOfOrder() {
+    idx.append(51, 0)
+    idx.append(50, 1)
+  }
+  
+  @Test
+  def reopenAsReadonly() {
+    val first = OffsetPosition(51, 0)
+    val sec = OffsetPosition(52, 1)
+    idx.append(first.offset, first.position)
+    idx.append(sec.offset, sec.position)
+    idx.close()
+    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset, mutable = false)
+    assertEquals(first, idxRo.lookup(first.offset))
+    assertEquals(sec, idxRo.lookup(sec.offset))
+    assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException])
+  }
+  
+  @Test
+  def truncate() {
+	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, mutable = true, maxIndexSize = 10 * 8)
+    for(i <- 1 until 10)
+      idx.append(i, i)
+      
+    idx.truncateTo(12)
+    assertEquals("Index should be unchanged by truncate past the end", OffsetPosition(9, 9), idx.lookup(10))
+    idx.truncateTo(10)
+    assertEquals("Index should be unchanged by truncate at the end", OffsetPosition(9, 9), idx.lookup(10))
+    idx.truncateTo(9)
+    assertEquals("Index should truncate off last entry", OffsetPosition(8, 8), idx.lookup(10))
+    idx.truncateTo(5)
+    assertEquals("4 should be the last entry in the index", OffsetPosition(4, 4), idx.lookup(10))
+    assertEquals("4 should be the last entry in the index", 4, idx.lastOffset)
+    
+    idx.truncate()
+    assertEquals("Full truncation should leave no entries", 0, idx.entries())
+  }
+  
+  def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) {
+    try {
+      idx.append(offset, 1)
+      fail(message)
+    } catch {
+      case e: Exception => assertEquals("Got an unexpected exception.", klass, e.getClass)
+    }
+  }
+  
+  def makeIndex(baseOffset: Long, mutable: Boolean, vals: Seq[(Long, Int)]): OffsetIndex = {
+    val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = baseOffset, mutable = mutable, maxIndexSize = 2 * vals.size * 8)
+    for ((logical, physical) <- vals)
+      idx.append(logical, physical)
+    idx
+  }
+
+  def monotonicSeq(base: Int, len: Int): Seq[Int] = {
+    val rand = new Random(1L)
+    val vals = new mutable.ArrayBuffer[Int](len)
+    var last = base
+    for (i <- 0 until len) {
+      last += rand.nextInt(15) + 1
+      vals += last
+    }
+    vals
+  }
+  
+  def nonExistantTempFile(): File = {
+    val file = TestUtils.tempFile()
+    file.delete()
+    file
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Mon Oct  8 19:13:24 2012
@@ -17,8 +17,10 @@
 
 package kafka.message
 
+import java.io.RandomAccessFile
 import junit.framework.Assert._
 import kafka.utils.TestUtils._
+import kafka.log.FileMessageSet
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 
@@ -59,11 +61,15 @@ trait BaseMessageSetTestCases extends JU
   }
 
   def testWriteToWithMessageSet(set: MessageSet) {
-    val channel = tempChannel()
-    val written = set.writeTo(channel, 0, 1024)
-    assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
-    val newSet = new FileMessageSet(channel, false)
-    checkEquals(set.iterator, newSet.iterator)
+    // do the write twice to ensure the message set is restored to its orginal state
+    for(i <- List(0,1)) {
+      val file = tempFile()
+      val channel = new RandomAccessFile(file, "rw").getChannel()
+      val written = set.writeTo(channel, 0, 1024)
+      assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
+      val newSet = new FileMessageSet(file, channel, false)
+      checkEquals(set.iterator, newSet.iterator)
+    }
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Mon Oct  8 19:13:24 2012
@@ -18,6 +18,7 @@
 package kafka.message
 
 import java.nio._
+import java.util.concurrent.atomic.AtomicLong
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.utils.TestUtils
@@ -27,24 +28,6 @@ class ByteBufferMessageSetTest extends B
 
   override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet = 
     new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
-  
-  @Test
-  def testSmallFetchSize() {
-    // create a ByteBufferMessageSet that doesn't contain a full message
-    // iterating it should get an InvalidMessageSizeException
-    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
-    val buffer = messages.buffer.slice
-    buffer.limit(10)
-    val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
-    try {
-      for (message <- messageSetWithNoFullMessage)
-        fail("shouldn't see any message")
-    }
-    catch {
-      case e: InvalidMessageSizeException => //this is expected
-      case e2 => fail("shouldn't see any other exceptions")
-    }
-  }
 
   @Test
   def testValidBytes() {
@@ -104,8 +87,6 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
-      //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
 
       //make sure shallow iterator is the same as deep iterator
       TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
@@ -118,9 +99,6 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
-      //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
-
       verifyShallowIterator(messageSet)
     }
 
@@ -137,9 +115,6 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
-      //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
-
       //make sure shallow iterator is the same as deep iterator
       TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
                                      TestUtils.getMessageIterator(mixedMessageSet.iterator))
@@ -158,17 +133,41 @@ class ByteBufferMessageSetTest extends B
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
-      //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
-
       verifyShallowIterator(mixedMessageSet)
     }
   }
+  
+  @Test
+  def testOffsetAssignment() {
+    val messages = new ByteBufferMessageSet(NoCompressionCodec,
+                                            new Message("hello".getBytes), 
+                                            new Message("there".getBytes), 
+                                            new Message("beautiful".getBytes))
+    val compressedMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                                      messages = messages.map(_.message).toBuffer:_*)
+    // check uncompressed offsets 
+    checkOffsets(messages, 0)
+    var offset = 1234567
+    checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec), offset)
+    
+    // check compressed messages
+    checkOffsets(compressedMessages, 0)
+    checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec), offset)
+  }
+  
+  /* check that offsets are assigned based on byte offset from the given base offset */
+  def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {
+    var offset = baseOffset
+    for(entry <- messages) {
+      assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
+      offset += 1
+    }
+  }
 
   def verifyShallowIterator(messageSet: ByteBufferMessageSet) {
-      //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator
-      val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
-      val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
-      assertTrue(shallowOffsets.subsetOf(deepOffsets))
+    //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator
+    val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
+    val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
+    assertTrue(shallowOffsets.subsetOf(deepOffsets))
   }
 }

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.io.ByteArrayOutputStream
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection._
+import kafka.utils.TestUtils
+import org.scalatest.junit.JUnitSuite
+import org.junit._
+import junit.framework.Assert._
+
+class MessageCompressionTest extends JUnitSuite {
+  
+  @Test
+  def testSimpleCompressDecompress() {
+    val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)
+    if(isSnappyAvailable)
+      codecs += SnappyCompressionCodec
+    for(codec <- codecs)
+      testSimpleCompressDecompress(codec)
+  }
+  
+  def testSimpleCompressDecompress(compressionCodec: CompressionCodec) {
+    val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes))
+    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages:_*)
+    assertEquals(compressionCodec, messageSet.shallowIterator.next.message.compressionCodec)
+    val decompressed = messageSet.iterator.map(_.message).toList
+    assertEquals(messages, decompressed)
+  }
+
+  @Test
+  def testComplexCompressDecompress() {
+    val messages = List(new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes))
+    val message = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = messages.slice(0, 2):_*)
+    val complexMessages = List(message.shallowIterator.next.message):::messages.slice(2,3)
+    val complexMessage = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = complexMessages:_*)
+    val decompressedMessages = complexMessage.iterator.map(_.message).toList
+    assertEquals(messages, decompressedMessages)
+  }
+  
+  def isSnappyAvailable(): Boolean = {
+    try {
+      val snappy = new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
+      true
+    } catch {
+      case e: UnsatisfiedLinkError => false
+    }
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala Mon Oct  8 19:13:24 2012
@@ -19,52 +19,78 @@ package kafka.message
 
 import java.util._
 import java.nio._
+import scala.collection._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
 import kafka.utils.TestUtils
+import kafka.utils.Utils
+
+case class MessageTestVal(val key: Array[Byte], 
+                          val payload: Array[Byte], 
+                          val codec: CompressionCodec, 
+                          val message: Message)
 
 class MessageTest extends JUnitSuite {
   
-  var message: Message = null
-  val payload = "some bytes".getBytes()
-
+  var messages = new mutable.ArrayBuffer[MessageTestVal]()
+  
   @Before
   def setUp(): Unit = {
-    message = new Message(payload)
+    val keys = Array(null, "key".getBytes, "".getBytes)
+    val vals = Array("value".getBytes, "".getBytes)
+    val codecs = Array(NoCompressionCodec, GZIPCompressionCodec)
+    for(k <- keys; v <- vals; codec <- codecs)
+      messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
   }
   
   @Test
   def testFieldValues = {
-    TestUtils.checkEquals(ByteBuffer.wrap(payload), message.payload)
-    assertEquals(Message.CurrentMagicValue, message.magic)
-    assertEquals(69L, new Message(69, "hello".getBytes()).checksum)
+    for(v <- messages) {
+      TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
+      assertEquals(Message.CurrentMagicValue, v.message.magic)
+      if(v.message.hasKey)
+        TestUtils.checkEquals(ByteBuffer.wrap(v.key), v.message.key)
+      else
+        assertEquals(null, v.message.key)
+      assertEquals(v.codec, v.message.compressionCodec)
+    }
   }
 
   @Test
   def testChecksum() {
-    assertTrue("Auto-computed checksum should be valid", message.isValid)
-    val badChecksum = message.checksum + 1 % Int.MaxValue
-    val invalid = new Message(badChecksum, payload)
-    assertEquals("Message should return written checksum", badChecksum, invalid.checksum)
-    assertFalse("Message with invalid checksum should be invalid", invalid.isValid)
+    for(v <- messages) {
+      assertTrue("Auto-computed checksum should be valid", v.message.isValid)
+      // garble checksum
+      val badChecksum: Int = (v.message.checksum + 1 % Int.MaxValue).toInt
+      Utils.putUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum)
+      assertFalse("Message with invalid checksum should be invalid", v.message.isValid)
+    }
   }
   
   @Test
   def testEquality() = {
-    assertFalse("Should not equal null", message.equals(null))
-    assertFalse("Should not equal a random string", message.equals("asdf"))
-    assertTrue("Should equal itself", message.equals(message))
-    val copy = new Message(message.checksum, payload)
-    assertTrue("Should equal another message with the same content.", message.equals(copy))
+    for(v <- messages) {
+      assertFalse("Should not equal null", v.message.equals(null))
+      assertFalse("Should not equal a random string", v.message.equals("asdf"))
+      assertTrue("Should equal itself", v.message.equals(v.message))
+      val copy = new Message(bytes = v.payload, key = v.key, codec = v.codec)
+      assertTrue("Should equal another message with the same content.", v.message.equals(copy))
+    }
   }
   
   @Test
   def testIsHashable() = {
     // this is silly, but why not
-    val m = new HashMap[Message,Boolean]()
-    m.put(message, true)
-    assertNotNull(m.get(message))
+    val m = new HashMap[Message, Message]()
+    for(v <- messages)
+      m.put(v.message, v.message)
+    for(v <- messages)
+      assertEquals(v.message, m.get(v.message))
   }
   
 }
+ 	

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala Mon Oct  8 19:13:24 2012
@@ -1,4 +1,4 @@
-package unit.kafka.metrics
+package kafka.metrics
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -21,7 +21,6 @@ import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import java.util.concurrent.TimeUnit
 import junit.framework.Assert._
-import kafka.metrics.KafkaTimer
 import com.yammer.metrics.core.{MetricsRegistry, Clock}
 
 class KafkaTimerTest extends JUnit3Suite {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Mon Oct  8 19:13:24 2012
@@ -19,6 +19,7 @@ package kafka.producer
 
 import java.util.{LinkedList, Properties}
 import java.util.concurrent.LinkedBlockingQueue
+import java.io.IOException
 import junit.framework.Assert._
 import org.easymock.EasyMock
 import org.junit.Test
@@ -149,7 +150,7 @@ class AsyncProducerTest extends JUnit3Su
     for (producerData <- producerDataList)
       queue.put(producerData)
 
-    Thread.sleep(queueExpirationTime + 10)
+    Thread.sleep(queueExpirationTime + 100)
     EasyMock.verify(mockHandler)
     producerSendThread.shutdown
   }
@@ -354,6 +355,7 @@ class AsyncProducerTest extends JUnit3Su
 
   @Test
   def testBrokerListAndAsync() {
+    return
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
     props.put("producer.type", "async")
@@ -401,7 +403,6 @@ class AsyncProducerTest extends JUnit3Su
     val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
     topicPartitionInfos.put("topic1", topic1Metadata)
 
-
     val msgs = TestUtils.getMsgStrings(2)
 
     // produce request for topic1 and partitions 0 and 1.  Let the first request fail
@@ -432,16 +433,10 @@ class AsyncProducerTest extends JUnit3Su
                                                       encoder = new StringEncoder,
                                                       producerPool = producerPool,
                                                       topicPartitionInfos)
-    try {
-      val data = List(
-        new ProducerData[Int,String](topic1, 0, msgs),
-        new ProducerData[Int,String](topic1, 1, msgs)
-      )
-      handler.handle(data)
-      handler.close()
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    }
+    val data = List(new ProducerData[Int,String](topic1, 0, msgs),
+                    new ProducerData[Int,String](topic1, 1, msgs))
+    handler.handle(data)
+    handler.close()
 
     EasyMock.verify(mockSyncProducer)
     EasyMock.verify(producerPool)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Oct  8 19:13:24 2012
@@ -170,19 +170,14 @@ class ProducerTest extends JUnit3Suite w
 
     val messageSet = if(leader == server1.config.brokerId) {
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      response1.messageSet("new-topic", 0).iterator
+      response1.messageSet("new-topic", 0).iterator.toBuffer
     }else {
       val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      response2.messageSet("new-topic", 0).iterator
+      response2.messageSet("new-topic", 0).iterator.toBuffer
     }
-    assertTrue("Message set should have 1 message", messageSet.hasNext)
-
-    assertEquals(new Message("test1".getBytes), messageSet.next.message)
-    assertTrue("Message set should have 1 message", messageSet.hasNext)
-    assertEquals(new Message("test2".getBytes), messageSet.next.message)
-    if (messageSet.hasNext)
-      fail("Message set should not have any more messages, but received a message of %s"
-            .format(Utils.toString(messageSet.next.message.payload, "UTF-8")))
+    assertEquals("Should have fetched 2 messages", 2, messageSet.size)
+    assertEquals(new Message("test1".getBytes), messageSet(0).message)
+    assertEquals(new Message("test2".getBytes), messageSet(1).message)
     producer1.close()
 
     try {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Mon Oct  8 19:13:24 2012
@@ -21,10 +21,11 @@ import java.net.SocketTimeoutException
 import java.util.Properties
 import junit.framework.Assert
 import kafka.admin.CreateTopicCommand
+import kafka.common.{ErrorMapping}
 import kafka.integration.KafkaServerTestHarness
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message._
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
+import kafka.utils._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.api.{ProducerResponseStatus, ProducerRequestPartitionData}
@@ -105,21 +106,22 @@ class SyncProducerTest extends JUnit3Sui
     CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
 
-    val message1 = new Message(new Array[Byte](1000001))
+    val message1 = new Message(new Array[Byte](configs(0).maxMessageSize + 1))
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
     val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
 
     Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
     Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error)
-    Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).nextOffset)
+    Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
 
-    val message2 = new Message(new Array[Byte](1000000))
+    val safeSize = configs(0).maxMessageSize - Message.MessageOverhead - MessageSet.LogOverhead - 1
+    val message2 = new Message(new Array[Byte](safeSize))
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
     val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
 
     Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
     Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error)
-    Assert.assertEquals(messageSet2.sizeInBytes, response2.status(TopicAndPartition("test", 0)).nextOffset)
+    Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
   }
 
   @Test
@@ -163,13 +165,13 @@ class SyncProducerTest extends JUnit3Sui
     // the first and last message should have been accepted by broker
     Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic1", 0)).error)
     Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic3", 0)).error)
-    Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic1", 0)).nextOffset)
-    Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic3", 0)).nextOffset)
+    Assert.assertEquals(0, response2.status(TopicAndPartition("topic1", 0)).offset)
+    Assert.assertEquals(0, response2.status(TopicAndPartition("topic3", 0)).offset)
 
     // the middle message should have been rejected because broker doesn't lead partition
     Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort,
                         response2.status(TopicAndPartition("topic2", 0)).error)
-    Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).nextOffset)
+    Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).offset)
   }
 
   @Test



Mime
View raw message