kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] kafka git commit: KAFKA-4586; Add purgeDataBefore() API (KIP-107)
Date Tue, 28 Mar 2017 16:59:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f3f9a9eaf -> 8b05ad406


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 1e2749f..c9ea05f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -17,16 +17,21 @@
 package kafka.api
 
 import java.util.Collections
+import java.util.concurrent.TimeUnit
 
 import kafka.admin.AdminClient
+import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.server.KafkaConfig
+import java.lang.{Long => JLong}
 import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.{Before, Test}
+import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.junit.{After, Before, Test}
 import org.junit.Assert._
+import scala.collection.JavaConverters._
 
 class AdminClientTest extends IntegrationTestHarness with Logging {
 
@@ -58,18 +63,132 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Before
   override def setUp() {
-    super.setUp
+    super.setUp()
     client = AdminClient.createSimplePlaintext(this.brokerList)
     TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
   }
 
+  @After
+  override def tearDown() {
+    client.close()
+    super.tearDown()
+  }
+
   @Test
-  def testListGroups() {
-    consumers.head.subscribe(Collections.singletonList(topic))
+  def testSeekToBeginningAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(0L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(5L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(10L, consumer.position(tp))
+  }
+
+  @Test
+  def testConsumeAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    var messageCount = 0
     TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+      messageCount += consumer.poll(0).count()
+      messageCount == 10
+    }, "Expected 10 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 7
+    }, "Expected 7 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 8L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 2
+    }, "Expected 2 messages", 3000L)
+  }
+
+  @Test
+  def testLogStartOffsetCheckpoint() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+
+    for (i <- 0 until serverCount)
+      killBroker(i)
+    restartDeadBrokers()
+
+    client.close()
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
+    client = AdminClient.createSimplePlaintext(brokerList)
+
+    TestUtils.waitUntilTrue(() => {
+      // Need to retry if leader is not available for the partition
+      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
+    }, "Expected low watermark of the partition to be 5L")
+  }
+
+  @Test
+  def testLogStartOffsetAfterDeleteRecords() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+
+    for (i <- 0 until serverCount)
+      assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
+  }
+
+  @Test
+  def testOffsetsForTimesAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
+  }
+
+  @Test
+  def testDeleteRecordsWithException() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    // Should get success result
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+    // OffsetOutOfRangeException if offset > high_watermark
+    assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
+    // TimeoutException if response is not available within user-specified timeout
+    assertEquals(DeleteRecordsResult(-1L, Errors.REQUEST_TIMED_OUT.exception()), client.deleteRecordsBefore(Map((tp, 5L))).get(0, TimeUnit.MILLISECONDS)(tp))
+
+    val nonExistPartition = new TopicPartition(topic, 3)
+    // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
+    assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
+                 client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
+  }
+
+  @Test
+  def testListGroups() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
 
     val groups = client.listAllGroupsFlattened
     assertFalse(groups.isEmpty)
@@ -80,11 +199,8 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testListAllBrokerVersionInfo() {
-    consumers.head.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
     val brokerVersionInfos = client.listAllBrokerVersionInfo
     val brokers = brokerList.split(",")
     assertEquals(brokers.size, brokerVersionInfos.size)
@@ -98,11 +214,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testGetConsumerGroupSummary() {
-    consumers.head.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+    subscribeAndWaitForAssignment(topic, consumers.head)
 
     val group = client.describeConsumerGroup(groupId)
     assertEquals("range", group.assignmentStrategy)
@@ -117,11 +229,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testDescribeConsumerGroup() {
-    consumers.head.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumers.head.poll(0)
-      !consumers.head.assignment.isEmpty
-    }, "Expected non-empty assignment")
+    subscribeAndWaitForAssignment(topic, consumers.head)
 
     val consumerGroupSummary = client.describeConsumerGroup(groupId)
     assertEquals(1, consumerGroupSummary.consumers.get.size)
@@ -133,4 +241,25 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
     val nonExistentGroup = "non" + groupId
     assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
   }
+
+  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+    consumer.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0)
+      !consumer.assignment.isEmpty
+    }, "Expected non-empty assignment")
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                          numRecords: Int,
+                          tp: TopicPartition) {
+    val futures = (0 until numRecords).map { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
+      debug(s"Sending this record: $record")
+      producer.send(record)
+    }
+
+    futures.foreach(_.get)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cb4c235..e4cece9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -190,7 +190,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createFetchRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
-    partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 100))
+    partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100))
     val version = ApiKeys.FETCH.latestVersion
     requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, partitionMap).build()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 4ec77a1..2198bf2 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -251,6 +251,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     restartDeadBrokers()
     checkClosedState(dynamicGroup, 0)
     checkClosedState(manualGroup, numRecords)
+    adminClient.close()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 7870485..ae76eb6 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -92,7 +92,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
         override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
           fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
             (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
-              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, null)))
+              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null)))
           }
         }
       }
@@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
 
       override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
         new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
-          quotaManagers.follower) {
+          quotaManagers.follower, metadataCache) {
 
           override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
                                                              quotaManager: ReplicationQuotaManager) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index aff3b2f..0c1297f 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -43,6 +43,7 @@ object StressTestLog {
 
     val log = new Log(dir = dir,
                       config = LogConfig(logProperties),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 9c29679..34c6775 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -206,7 +206,7 @@ object TestLinearWriteSpeed {
   
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable {
     Utils.delete(dir)
-    val log = new Log(dir, config, 0L, scheduler, Time.SYSTEM)
+    val log = new Log(dir, config, 0L, 0L, scheduler, Time.SYSTEM)
     def write(): Int = {
       log.append(messages, true)
       messages.sizeInBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 0b1978c..5f97708 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -53,7 +53,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     val logProps = new Properties()
     logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     /* append two messages */
     log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec),

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index f2dbc6e..3e91f96 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -324,6 +324,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
       val log = new Log(dir = dir,
                         LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)),
+                        logStartOffset = 0L,
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 4d8c836..05d9060 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -151,6 +151,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
 
       val log = new Log(dir = dir,
         LogConfig(logProps),
+        logStartOffset = 0L,
         recoveryPoint = 0L,
         scheduler = time.scheduler,
         time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 3690c55..94207ec 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -183,6 +183,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     val partitionDir = new File(logDir, "log-0")
     val log = new Log(partitionDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -190,7 +191,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   }
 
   private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
   private def records(key: Int, value: Int, timestamp: Long) =
     MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 18c1bbe..38eb94c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -775,7 +775,7 @@ class LogCleanerTest extends JUnitSuite {
     messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
 
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
 
   def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */  }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c87e927..768c073 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -75,6 +75,7 @@ class LogTest extends JUnitSuite {
     // create a log
     val log = new Log(logDir,
                       LogConfig(logProps),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)
@@ -134,6 +135,7 @@ class LogTest extends JUnitSuite {
     // create a log
     val log = new Log(logDir,
       LogConfig(logProps),
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time)
@@ -165,7 +167,7 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -181,7 +183,7 @@ class LogTest extends JUnitSuite {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds))
   }
 
@@ -194,7 +196,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -218,7 +220,7 @@ class LogTest extends JUnitSuite {
   def testAppendAndReadWithNonSequentialOffsets() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -243,7 +245,7 @@ class LogTest extends JUnitSuite {
   def testReadAtLogGap() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -260,7 +262,7 @@ class LogTest extends JUnitSuite {
   def testReadWithMinMessage() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -288,7 +290,7 @@ class LogTest extends JUnitSuite {
   def testReadWithTooSmallMaxLength() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -324,7 +326,7 @@ class LogTest extends JUnitSuite {
 
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     log.append(TestUtils.singletonRecords(value = "42".getBytes))
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@@ -355,7 +357,7 @@ class LogTest extends JUnitSuite {
     /* create a multipart log with 100 messages */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                 timestamp = time.milliseconds))
@@ -393,7 +395,7 @@ class LogTest extends JUnitSuite {
     /* this log should roll after every messageset */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.append(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)))
@@ -419,7 +421,7 @@ class LogTest extends JUnitSuite {
       val logProps = new Properties()
       logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
-      val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10))
 
@@ -455,7 +457,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     try {
       log.append(messageSet)
@@ -482,7 +484,7 @@ class LogTest extends JUnitSuite {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
 
     try {
       log.append(messageSetWithUnkeyedMessage)
@@ -524,7 +526,7 @@ class LogTest extends JUnitSuite {
     val maxMessageSize = second.sizeInBytes - 1
     val logProps = new Properties()
     logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time)
 
     // should be able to append the small message
     log.append(first)
@@ -550,7 +552,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = time.milliseconds + i * 10))
@@ -576,12 +578,12 @@ class LogTest extends JUnitSuite {
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, time.scheduler, time)
     verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
-    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     verifyRecoveredLog(log)
     log.close()
   }
@@ -597,7 +599,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
 
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@@ -621,7 +623,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -633,7 +635,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -660,7 +662,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@@ -670,7 +672,7 @@ class LogTest extends JUnitSuite {
     timeIndexFiles.foreach(_.delete())
 
     // The rebuilt time index should be empty
-    log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, time.scheduler, time)
     val segArray = log.logSegments.toArray
     for (i <- 0 until segArray.size - 1) {
       assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@@ -691,7 +693,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
@@ -713,7 +715,7 @@ class LogTest extends JUnitSuite {
     }
 
     // reopen the log
-    log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -739,7 +741,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
 
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (_ <- 1 to msgPerSeg)
@@ -794,7 +796,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -838,6 +840,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val log = new Log(logDir,
                       LogConfig(logProps),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -869,6 +872,7 @@ class LogTest extends JUnitSuite {
     // create a log
     var log = new Log(logDir,
                       config,
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -879,6 +883,7 @@ class LogTest extends JUnitSuite {
     log.close()
     log = new Log(logDir,
                   config,
+                  logStartOffset = 0L,
                   recoveryPoint = 0L,
                   time.scheduler,
                   time)
@@ -904,6 +909,7 @@ class LogTest extends JUnitSuite {
 
     val log = new Log(logDir,
                       config,
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -944,6 +950,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir,
                       config,
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -958,6 +965,7 @@ class LogTest extends JUnitSuite {
 
     log = new Log(logDir,
                   config,
+                  logStartOffset = 0L,
                   recoveryPoint = 0L,
                   time.scheduler,
                   time)
@@ -968,6 +976,7 @@ class LogTest extends JUnitSuite {
   def testAppendMessageWithNullPayload() {
     val log = new Log(logDir,
                       LogConfig(),
+                      logStartOffset = 0L,
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -981,6 +990,7 @@ class LogTest extends JUnitSuite {
   def testAppendWithOutOfOrderOffsetsThrowsException() {
     val log = new Log(logDir,
       LogConfig(),
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -994,6 +1004,7 @@ class LogTest extends JUnitSuite {
   def testAppendWithNoTimestamp(): Unit = {
     val log = new Log(logDir,
       LogConfig(),
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1016,6 +1027,7 @@ class LogTest extends JUnitSuite {
       logDir.mkdirs()
       var log = new Log(logDir,
                         config,
+                        logStartOffset = 0L,
                         recoveryPoint = 0L,
                         time.scheduler,
                         time)
@@ -1030,7 +1042,7 @@ class LogTest extends JUnitSuite {
       TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
 
       // attempt recovery
-      log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
+      log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)
 
       val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
@@ -1058,6 +1070,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1106,6 +1119,7 @@ class LogTest extends JUnitSuite {
     // create a log and write some messages to it
     var log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1116,7 +1130,7 @@ class LogTest extends JUnitSuite {
     // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
     // clean shutdown file exists.
     recoveryPoint = log.logEndOffset
-    log = new Log(logDir, config, 0L, time.scheduler, time)
+    log = new Log(logDir, config, 0L, 0L, time.scheduler, time)
     assertEquals(recoveryPoint, log.logEndOffset)
     cleanShutdownFile.delete()
   }
@@ -1205,6 +1219,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)
@@ -1226,6 +1241,31 @@ class LogTest extends JUnitSuite {
     assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
   }
 
+  @Test
+  def testLogDeletionAfterDeleteRecords() {
+    val set = TestUtils.singletonRecords("test".getBytes)
+    val log = createLog(set.sizeInBytes)
+
+    for (_ <- 0 until 15)
+      log.append(set)
+    assertEquals("should have 3 segments", 3, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 0)
+
+    log.maybeIncrementLogStartOffset(1)
+    log.deleteOldSegments()
+    assertEquals("should have 3 segments", 3, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 1)
+
+    log.maybeIncrementLogStartOffset(6)
+    log.deleteOldSegments()
+    assertEquals("should have 2 segments", 2, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 6)
+
+    log.maybeIncrementLogStartOffset(15)
+    log.deleteOldSegments()
+    assertEquals("should have 1 segments", 1, log.numberOfSegments)
+    assertEquals(log.logStartOffset, 15)
+  }
 
   @Test
   def shouldDeleteSizeBasedSegments() {
@@ -1324,6 +1364,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
+      logStartOffset = 0L,
       recoveryPoint = 0L,
       time.scheduler,
       time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 581a917..3babfc8 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -121,7 +121,7 @@ class AbstractFetcherThreadTest {
       fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq
 
     override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
-      new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap)
+      new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.fetchOffset) }.toMap)
   }
 
 
@@ -199,7 +199,7 @@ class AbstractFetcherThreadTest {
       partitionMap.foreach { case (topicPartition, partitionFetchState) =>
         // Add backoff delay check
         if (partitionFetchState.isActive)
-          requestMap.put(topicPartition, partitionFetchState.offset)
+          requestMap.put(topicPartition, partitionFetchState.fetchOffset)
       }
       new DummyFetchRequest(requestMap)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index b350732..eefb35e 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -62,7 +62,7 @@ class FetchRequestTest extends BaseRequestTest {
                                  offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
     topicPartitions.foreach { tp =>
-      partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), maxPartitionBytes))
+      partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes))
     }
     partitionMap
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 948b5ec..55cfa27 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -60,7 +60,8 @@ class HighwatermarkPersistenceTest {
     val time = new MockTime
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
-      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
+      new MetadataCache(configs.head.brokerId))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -104,7 +105,8 @@ class HighwatermarkPersistenceTest {
     val time = new MockTime
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
-      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
+      new MetadataCache(configs.head.brokerId))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 348bfc3..3898d2b 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -53,7 +53,7 @@ class IsrExpirationTest {
   @Before
   def setUp() {
     replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false),
-      QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
   }
 
   @After
@@ -78,7 +78,9 @@ class IsrExpirationTest {
     for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
                                                     hw = 15L,
+                                                    leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 15L,
+                                                    followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1))
     var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
@@ -130,7 +132,9 @@ class IsrExpirationTest {
     for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY),
                                                     hw = 10L,
+                                                    leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 15L,
+                                                    followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1))
 
@@ -144,7 +148,9 @@ class IsrExpirationTest {
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
       r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY),
                             hw = 11L,
+                            leaderLogStartOffset = 0L,
                             leaderLogEndOffset = 15L,
+                            followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1))
     }
@@ -161,7 +167,9 @@ class IsrExpirationTest {
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
       r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY),
                             hw = 15L,
+                            leaderLogStartOffset = 0L,
                             leaderLogEndOffset = 15L,
+                            followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1))
     }
@@ -185,7 +193,9 @@ class IsrExpirationTest {
     for (replica <- partition.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY),
                                                     hw = 0L,
+                                                    leaderLogStartOffset = 0L,
                                                     leaderLogEndOffset = 0L,
+                                                    followerLogStartOffset = 0L,
                                                     fetchTimeMs = time.milliseconds,
                                                     readSize = -1))
     // set the leader and its hw and the hw update time

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c89e626..c27f8c5 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -587,6 +587,7 @@ class KafkaConfigTest {
         case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
         case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
@@ -603,6 +604,7 @@ class KafkaConfigTest {
         case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
         case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9386a1d..f65884e 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import java.io.File
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{Properties, Random}
+import java.lang.{Long => JLong}
 
 import kafka.admin.AdminUtils
 import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
@@ -31,10 +32,13 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import scala.collection.JavaConverters._
 
 class LogOffsetTest extends ZooKeeperTestHarness {
   val random = new Random()
@@ -76,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testGetOffsetsBeforeLatestTime() {
+  def testGetOffsetsAfterDeleteRecords() {
     val topicPartition = "kafka-" + 0
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
@@ -93,6 +97,40 @@ class LogOffsetTest extends ZooKeeperTestHarness {
       log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
     log.flush()
 
+    log.maybeIncrementLogStartOffset(3)
+    log.deleteOldSegments()
+
+    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets)
+
+    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 15)),
+      replicaId = 0)
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
+  }
+
+  @Test
+  def testGetOffsetsBeforeLatestTime() {
+    val topicPartition = "kafka-" + 0
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    // setup brokers in zookeeper as owners of partitions for this test
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+
+    val logManager = server.getLogManager
+    waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(new TopicPartition(topic, part)).get
+
+    for (_ <- 0 until 20)
+      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
+    log.flush()
+
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f856e52..a720a6a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -40,7 +40,7 @@ class ReplicaManagerQuotasTest {
   val record = new SimpleRecord("some-data-in-a-message".getBytes())
   val topicPartition1 = new TopicPartition("test-topic", 1)
   val topicPartition2 = new TopicPartition("test-topic", 2)
-  val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 100), topicPartition2 -> new PartitionData(0, 100))
+  val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 0, 100), topicPartition2 -> new PartitionData(0, 0, 100))
   var replicaManager: ReplicaManager = null
 
   @Test
@@ -147,6 +147,7 @@ class ReplicaManagerQuotasTest {
 
     //Create log which handles both a regular read and a 0 bytes read
     val log = createMock(classOf[Log])
+    expect(log.logStartOffset).andReturn(0L).anyTimes()
     expect(log.logEndOffset).andReturn(20L).anyTimes()
     expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
 
@@ -173,7 +174,7 @@ class ReplicaManagerQuotasTest {
     replay(logManager)
 
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d48e4f2..e00c142 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -65,7 +65,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -83,7 +83,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -100,7 +100,7 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId), Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
         assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@@ -127,8 +127,12 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
+    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+    EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache)
 
     try {
       var produceCallbackFired = false
@@ -145,11 +149,6 @@ class ReplicaManagerTest {
         fetchCallbackFired = true
       }
 
-      val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
-      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
-      EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
-      EasyMock.replay(metadataCache)
-
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
 
@@ -159,7 +158,7 @@ class ReplicaManagerTest {
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       // Append a message.
@@ -178,14 +177,14 @@ class ReplicaManagerTest {
         fetchMinBytes = 100000,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)),
         responseCallback = fetchCallback)
 
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {})
+      rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => {})
 
       assertTrue(produceCallbackFired)
       assertTrue(fetchCallbackFired)
@@ -203,13 +202,16 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps))
+    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2))
+    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
+    EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes()
+    EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
     try {
-      val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2))
-      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
-      EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
-      EasyMock.replay(metadataCache)
       
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
@@ -221,7 +223,7 @@ class ReplicaManagerTest {
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
-      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {})
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
@@ -251,7 +253,7 @@ class ReplicaManagerTest {
         fetchMinBytes = 0,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
         responseCallback = fetchCallback)
         
       
@@ -267,7 +269,7 @@ class ReplicaManagerTest {
         fetchMinBytes = 0,
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
-        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 100000)),
+        fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)),
         responseCallback = fetchCallback)
           
         assertTrue(fetchCallbackFired)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index f33c73a..0129d5d 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -59,7 +59,7 @@ class SimpleFetchTest {
   val partitionId = 0
   val topicPartition = new TopicPartition(topic, partitionId)
 
-  val fetchInfo = Seq(topicPartition -> new PartitionData(0, fetchSize))
+  val fetchInfo = Seq(topicPartition -> new PartitionData(0, 0, fetchSize))
 
   var replicaManager: ReplicaManager = null
 
@@ -75,6 +75,7 @@ class SimpleFetchTest {
 
     // create the log which takes read with either HW max offset or none max offset
     val log = EasyMock.createMock(classOf[Log])
+    EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
     EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
@@ -96,7 +97,7 @@ class SimpleFetchTest {
 
     // create the replica manager
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
 
     // add the partition with two replicas, both in ISR
     val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
@@ -111,7 +112,9 @@ class SimpleFetchTest {
     val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
     followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY),
                                                           hw = leo.messageOffset,
+                                                          leaderLogStartOffset = 0L,
                                                           leaderLogEndOffset = leo.messageOffset,
+                                                          followerLogStartOffset = 0L,
                                                           fetchTimeMs = time.milliseconds,
                                                           readSize = -1))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1ffc7c3..9ae7195 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -934,7 +934,8 @@ object TestUtils extends Logging {
                    cleanerConfig = cleanerConfig,
                    ioThreads = 4,
                    flushCheckMs = 1000L,
-                   flushCheckpointMs = 10000L,
+                   flushRecoveryOffsetCheckpointMs = 10000L,
+                   flushStartOffsetCheckpointMs = 10000L,
                    retentionCheckMs = 1000L,
                    scheduler = time.scheduler,
                    time = time,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ec29da7..109097c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -65,6 +65,12 @@
         producer's <code>batch.size</code> configuration.</li>
 </ul>
 
+<h5><a id="upgrade_1100_new_protocols" href="#upgrade_1100_new_protocols">New Protocol Versions</a></h5>
+<ul>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore()+API+in+AdminClient">KIP-107</a>: FetchRequest v5 introduces a partition-level <code>log_start_offset</code> field. </li>
+    <li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore()+API+in+AdminClient">KIP-107</a>: FetchResponse v5 introduces a partition-level <code>log_start_offset</code> field. </li>
+</ul>
+
 <h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
 <p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
 However, please review the <a href="#upgrade_1020_notable">notable changes in 0.10.2.0</a> before upgrading.


Mime
View raw message