kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1416254 - in /kafka/trunk/core/src: main/scala/kafka/log/ main/scala/kafka/utils/ test/scala/unit/kafka/log/ test/scala/unit/kafka/server/
Date Sun, 02 Dec 2012 20:55:27 GMT
Author: jkreps
Date: Sun Dec  2 20:55:27 2012
New Revision: 1416254

URL: http://svn.apache.org/viewvc?rev=1416254&view=rev
Log:
KAFKA-521 Missing files on last commit.


Added:
    kafka/trunk/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
Removed:
    kafka/trunk/core/src/main/scala/kafka/log/SegmentList.scala
    kafka/trunk/core/src/main/scala/kafka/utils/Range.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala

Added: kafka/trunk/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala?rev=1416254&view=auto
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala (added)
+++ kafka/trunk/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala Sun Dec  2 20:55:27
2012
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import kafka.utils._
+import junit.framework.Assert._
+import java.util.{Random, Properties}
+import kafka.consumer.SimpleConsumer
+import org.junit.{After, Before, Test}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
+import kafka.utils.TestUtils._
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.utils.nonthreadsafe
+import kafka.utils.threadsafe
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+
+class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val random = new Random() 
+  var logDir: File = null
+  var topicLogDir: File = null
+  var server: KafkaServer = null
+  var logSize: Int = 100
+  val brokerPort: Int = 9099
+  var simpleConsumer: SimpleConsumer = null
+  var time: Time = new MockTime()
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val config: Properties = createBrokerConfig(1, brokerPort)
+    val logDirPath = config.getProperty("log.dir")
+    logDir = new File(logDirPath)
+    time = new MockTime()
+    server = TestUtils.createServer(new KafkaConfig(config), time)
+    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+  }
+
+  @After
+  override def tearDown() {
+    simpleConsumer.close
+    server.shutdown
+    Utils.rm(logDir)
+    super.tearDown()
+  }
+
+  @Test
+  def testGetOffsetsForUnknownTopic() {
+    val topicAndPartition = TopicAndPartition("foo", 0)
+    val request = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
+    val offsetResponse = simpleConsumer.getOffsetsBefore(request)
+    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
+                 offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
+  }
+
+  @Test
+  def testGetOffsetsBeforeLatestTime() {
+    val topicPartition = "kafka-" + 0
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+
+    val logManager = server.getLogManager
+    val log = logManager.getOrCreateLog(topic, part)
+
+    val message = new Message(Integer.toString(42).getBytes())
+    for(i <- 0 until 20)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+    log.flush()
+
+    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime,
10)
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
+
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
+      replicaId = 0)
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
+
+    // try to fetch using latest offset
+    val fetchResponse = simpleConsumer.fetch(
+      new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
+    assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
+  }
+
+  @Test
+  def testEmptyLogsGetOffsets() {
+    val topicPartition = "kafka-" + random.nextInt(10)
+    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    topicLogDir = new File(topicPartitionPath)
+    topicLogDir.mkdir
+
+    val topic = topicPartition.split("-").head
+
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+
+    var offsetChanged = false
+    for(i <- 1 to 14) {
+      val topicAndPartition = TopicAndPartition(topic, 0)
+      val offsetRequest =
+        OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,
1)))
+      val consumerOffsets =
+        simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+
+      if(consumerOffsets(0) == 1) {
+        offsetChanged = true
+      }
+    }
+    assertFalse(offsetChanged)
+  }
+
+  @Test
+  def testGetOffsetsBeforeNow() {
+    val topicPartition = "kafka-" + random.nextInt(3)
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+
+    val logManager = server.getLogManager
+    val log = logManager.getOrCreateLog(topic, part)
+    val message = new Message(Integer.toString(42).getBytes())
+    for(i <- 0 until 20)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+    log.flush()
+
+    val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions
with the fs
+
+    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now,
10)
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
+
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now,
10)), replicaId = 0)
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
+  }
+
+  @Test
+  def testGetOffsetsBeforeEarliestTime() {
+    val topicPartition = "kafka-" + random.nextInt(3)
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+
+    val logManager = server.getLogManager
+    val log = logManager.getOrCreateLog(topic, part)
+    val message = new Message(Integer.toString(42).getBytes())
+    for(i <- 0 until 20)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+    log.flush()
+
+    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime,
10)
+
+    assertEquals(Seq(0L), offsets)
+
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest =
+      OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,
10)))
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(0L), consumerOffsets)
+  }
+
+  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+    val props = new Properties
+    props.put("brokerid", nodeId.toString)
+    props.put("port", port.toString)
+    props.put("log.dir", getLogDir.getAbsolutePath)
+    props.put("log.flush.interval", "1")
+    props.put("enable.zookeeper", "false")
+    props.put("num.partitions", "20")
+    props.put("log.retention.hours", "10")
+    props.put("log.cleanup.interval.mins", "5")
+    props.put("log.file.size", logSize.toString)
+    props.put("zk.connect", zkConnect.toString)
+    props
+  }
+
+  private def getLogDir(): File = {
+    val dir = TestUtils.tempDir()
+    dir
+  }
+
+}



Mime
View raw message