kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/3] kafka git commit: KIP-101: Alter Replication Protocol to use Leader Epoch rather than High Watermark for Truncation
Date Thu, 06 Apr 2017 21:51:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b611cfa5c -> 0baea2ac1


http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index f868032..15e77a0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -242,4 +242,4 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val metricName = broker.metrics.metricName("byte-rate", repType.toString)
     broker.metrics.metrics.asScala(metricName).value
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 0129d5d..ba17db6 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -74,9 +74,10 @@ class SimpleFetchTest {
     EasyMock.replay(scheduler)
 
     // create the log which takes read with either HW max offset or none max offset
-    val log = EasyMock.createMock(classOf[Log])
+    val log = EasyMock.createNiceMock(classOf[Log])
     EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
+    EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
     EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
       FetchDataInfo(

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
new file mode 100644
index 0000000..e7c6a97
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
@@ -0,0 +1,72 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.checkpoints
+
+import java.io.File
+
+import kafka.server.epoch.EpochEntry
+import kafka.utils.Logging
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+
+class LeaderEpochCheckpointFileTest extends JUnitSuite with Logging{
+
+  @Test
+  def shouldPersistAndOverwriteAndReloadFile(): Unit ={
+    val file = File.createTempFile("temp-checkpoint-file", System.nanoTime().toString)
+    file.deleteOnExit()
+
+    val checkpoint = new LeaderEpochCheckpointFile(file)
+
+    //Given
+    val epochs = Seq(EpochEntry(0, 1L), EpochEntry(1, 2L), EpochEntry(2, 3L))
+
+    //When
+    checkpoint.write(epochs)
+
+    //Then
+    assertEquals(epochs, checkpoint.read())
+
+    //Given overwrite
+    val epochs2 = Seq(EpochEntry(3, 4L), EpochEntry(4, 5L))
+
+    //When
+    checkpoint.write(epochs2)
+
+    //Then
+    assertEquals(epochs2, checkpoint.read())
+  }
+
+  @Test
+  def shouldRetainValuesEvenIfCheckpointIsRecreated(): Unit ={
+    val file = File.createTempFile("temp-checkpoint-file", System.nanoTime().toString)
+    file.deleteOnExit()
+
+    //Given a file with data in
+    val checkpoint = new LeaderEpochCheckpointFile(file)
+    val epochs = Seq(EpochEntry(0, 1L), EpochEntry(1, 2L), EpochEntry(2, 3L))
+    checkpoint.write(epochs)
+
+    //When we recreate
+    val checkpoint2 = new LeaderEpochCheckpointFile(file)
+
+    //The data should still be there
+    assertEquals(epochs, checkpoint2.read())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
new file mode 100644
index 0000000..cc49ccf
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -0,0 +1,89 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package unit.kafka.server.checkpoints
+import kafka.server.checkpoints.{OffsetCheckpointFile}
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.common.TopicPartition
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection.Map
+
+class OffsetCheckpointFileTest extends JUnitSuite with Logging {
+
+  @Test
+  def shouldPersistAndOverwriteAndReloadFile(): Unit = {
+
+    val checkpoint = new OffsetCheckpointFile(TestUtils.tempFile())
+
+    //Given
+    val offsets = Map(new TopicPartition("foo", 1) -> 5L, new TopicPartition("bar", 2) -> 10L)
+
+    //When
+    checkpoint.write(offsets)
+
+    //Then
+    assertEquals(offsets, checkpoint.read())
+
+    //Given overwrite
+    val offsets2 = Map(new TopicPartition("foo", 2) -> 15L, new TopicPartition("bar", 3) -> 20L)
+
+    //When
+    checkpoint.write(offsets2)
+
+    //Then
+    assertEquals(offsets2, checkpoint.read())
+  }
+
+  @Test
+  def shouldHandleMultipleLines(): Unit = {
+
+    val checkpoint = new OffsetCheckpointFile(TestUtils.tempFile())
+
+    //Given
+    val offsets = Map(
+      new TopicPartition("foo", 1) -> 5L, new TopicPartition("bar", 6) -> 10L,
+      new TopicPartition("foo", 2) -> 5L, new TopicPartition("bar", 7) -> 10L,
+      new TopicPartition("foo", 3) -> 5L, new TopicPartition("bar", 8) -> 10L,
+      new TopicPartition("foo", 4) -> 5L, new TopicPartition("bar", 9) -> 10L,
+      new TopicPartition("foo", 5) -> 5L, new TopicPartition("bar", 10) -> 10L
+    )
+
+    //When
+    checkpoint.write(offsets)
+
+    //Then
+    assertEquals(offsets, checkpoint.read())
+  }
+
+  @Test
+  def shouldReturnEmptyMapForEmptyFile(): Unit = {
+
+    //When
+    val checkpoint = new OffsetCheckpointFile(TestUtils.tempFile())
+
+    //Then
+    assertEquals(Map(), checkpoint.read())
+
+    //When
+    checkpoint.write(Map())
+
+    //Then
+    assertEquals(Map(), checkpoint.read())
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
new file mode 100644
index 0000000..a09e1cc
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -0,0 +1,410 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server.epoch
+
+import java.io.{File, RandomAccessFile}
+import java.util
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.api.KAFKA_0_11_0_IV1
+import kafka.log.Log
+import kafka.server.KafkaConfig._
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.tools.DumpLogSegments
+import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.TestUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.serialization.Deserializer
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ListBuffer => Buffer}
+
+/**
+  * These tests were written to assert the addition of leader epochs to the replication protocol fix the problems
+  * described in KIP-101. There is a boolean KIP_101_ENABLED which can be toggled to demonstrate the tests failing in the pre-KIP-101 case
+  *
+  * https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
+  *
+  * A test which validates the end to end workflow is also included.
+  */
+class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness with Logging {
+
+  val topic = "topic1"
+  val msg = new Array[Byte](1000)
+  val msgBigger = new Array[Byte](10000)
+  var brokers: Seq[KafkaServer] = null
+  var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+  var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = null
+
+  val KIP_101_ENABLED = true
+
+  @Before
+  override def setUp() {
+    super.setUp()
+  }
+
+  @After
+  override def tearDown() {
+    brokers.par.foreach(_.shutdown())
+    producer.close()
+    super.tearDown()
+  }
+
+  @Test
+  def shouldFollowLeaderEpochBasicWorkflow(): Unit = {
+
+    //Given 2 brokers
+    brokers = (100 to 101).map(createBroker(_))
+
+    //A single partition topic with 2 replicas
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+    producer = createProducer()
+    val tp = new TopicPartition(topic, 0)
+
+    //When one record is written to the leader
+    producer.send(new ProducerRecord(topic, 0, null, msg)).get
+
+    //The message should have epoch 0 stamped onto it in both leader and follower
+    assertEquals(0, latestRecord(leader).partitionLeaderEpoch())
+    assertEquals(0, latestRecord(follower).partitionLeaderEpoch())
+
+    //Both leader and follower should have recorded Epoch 0 at Offset 0
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+
+    //Bounce the follower
+    bounce(follower)
+    awaitISR(tp)
+
+    //Nothing happens yet as we haven't sent any new messages.
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries())
+
+    //Send a message
+    producer.send(new ProducerRecord(topic, 0, null, msg)).get
+
+    //Epoch1 should now propagate to the follower with the written message
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+
+    //The new message should have epoch 1 stamped
+    assertEquals(1, latestRecord(leader).partitionLeaderEpoch())
+    assertEquals(1, latestRecord(follower).partitionLeaderEpoch())
+
+    //Bounce the leader. Epoch -> 2
+    bounce(leader)
+    awaitISR(tp)
+
+    //Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication.
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries())
+
+    //Send a message
+    producer.send(new ProducerRecord(topic, 0, null, msg)).get
+
+    //This should case epoch 2 to propagate to the follower
+    assertEquals(2, latestRecord(leader).partitionLeaderEpoch())
+    assertEquals(2, latestRecord(follower).partitionLeaderEpoch())
+
+    //The leader epoch files should now match on leader and follower
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries())
+    assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries())
+  }
+
+  @Test
+  def shouldNotAllowDivergentLogs(): Unit = {
+
+    //Given two brokers
+    brokers = (100 to 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+
+    //A single partition topic with 2 replicas
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+      0 -> Seq(100, 101)
+    ))
+    producer = createProducer()
+
+    //Write 10 messages
+    (0 until 10).foreach { i =>
+      producer.send(new ProducerRecord(topic, 0, null, msg))
+      producer.flush()
+    }
+
+    //Stop the brokers
+    brokers.foreach { b => b.shutdown() }
+
+    //Delete the clean shutdown file to simulate crash
+    new File(brokers(0).config.logDirs(0), Log.CleanShutdownFile).delete()
+
+    //Delete 5 messages from the leader's log on 100
+    deleteMessagesFromLogFile(5 * msg.length, brokers(0), 0)
+
+    //Restart broker 100
+    brokers(0).startup()
+
+    //Bounce the producer (this is required, although I'm unsure as to why?)
+    producer.close()
+    producer = createProducer()
+
+    //Write ten larger messages (so we can easily distinguish between messages written in the two phases)
+    (0 until 10).foreach { _ =>
+      producer.send(new ProducerRecord(topic, 0, null, msgBigger))
+      producer.flush()
+    }
+
+    //Start broker 101
+    brokers(1).startup()
+
+    //Wait for replication to resync
+    waitForLogsToMatch(brokers(0), brokers(1))
+
+    assertEquals("Log files should match Broker0 vs Broker 1", getLogFile(brokers(0), 0).length, getLogFile(brokers(1), 0).length)
+  }
+
+  //We can reproduce the pre-KIP-101 failure of this test by setting KafkaConfig.InterBrokerProtocolVersionProp = KAFKA_0_11_0_IV1
+  @Test
+  def offsetsShouldNotGoBackwards(): Unit = {
+
+    //Given two brokers
+    brokers = (100 to 101).map(createBroker(_))
+
+    //A single partition topic with 2 replicas
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+      0 -> Seq(100, 101)
+    ))
+    producer = bufferingProducer()
+
+    //Write 100 messages
+    (0 until 100).foreach { i =>
+      producer.send(new ProducerRecord(topic, 0, null, msg))
+      producer.flush()
+    }
+
+    //Stop the brokers
+    brokers.foreach { b => b.shutdown() }
+
+    //Delete the clean shutdown file to simulate crash
+    new File(brokers(0).config.logDirs(0), Log.CleanShutdownFile).delete()
+
+    //Delete half the messages from the log file
+    deleteMessagesFromLogFile(getLogFile(brokers(0), 0).length() / 2, brokers(0), 0)
+
+    //Start broker 100 again
+    brokers(0).startup()
+
+    //Bounce the producer (this is required, although I'm unsure as to why?)
+    producer.close()
+    producer = bufferingProducer()
+
+    //Write two large batches of messages. This will ensure that the LeO of the follower's log aligns with the middle
+    //of the a compressed message set in the leader (which, when forwarded, will result in offsets going backwards)
+    (0 until 77).foreach { _ =>
+      producer.send(new ProducerRecord(topic, 0, null, msg))
+    }
+    producer.flush()
+    (0 until 77).foreach { _ =>
+      producer.send(new ProducerRecord(topic, 0, null, msg))
+    }
+    producer.flush()
+
+    printSegments()
+
+    //Start broker 101. When it comes up it should read a whole batch of messages from the leader.
+    //As the chronology is lost we would end up with non-monatonic offsets (pre kip-101)
+    brokers(1).startup()
+
+    //Wait for replication to resync
+    waitForLogsToMatch(brokers(0), brokers(1))
+
+    printSegments()
+
+    //Shut down broker 100, so we read from broker 101 which should have corrupted
+    brokers(0).shutdown()
+
+    //Search to see if we have non-monotonic offsets in the log
+    startConsumer()
+    val records = consumer.poll(1000).asScala
+    var prevOffset = -1L
+    records.foreach { r =>
+      assertTrue(s"Offset $prevOffset came before ${r.offset} ", r.offset > prevOffset)
+      prevOffset = r.offset
+    }
+
+    //Are the files identical?
+    assertEquals("Log files should match Broker0 vs Broker 1", getLogFile(brokers(0), 0).length, getLogFile(brokers(1), 0).length)
+  }
+
+  /**
+    * Unlike the tests above, this test doesn't fail prior to the Leader Epoch Change. I was unable to find a deterministic
+    * method for recreating the fast leader change bug.
+    */
+  @Test
+  def shouldSurviveFastLeaderChange(): Unit = {
+    val tp = new TopicPartition(topic, 0)
+
+    //Given 2 brokers
+    brokers = (100 to 101).map(createBroker(_))
+
+    //A single partition topic with 2 replicas
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+    producer = createProducer()
+
+    //Kick off with a single record
+    producer.send(new ProducerRecord(topic, 0, null, msg)).get
+    var messagesWritten = 1
+
+    //Now invoke the fast leader change bug
+    (0 until 5).foreach { i =>
+      val leaderId = zkUtils.getLeaderForPartition(topic, 0).get
+      val leader = brokers.filter(_.config.brokerId == leaderId)(0)
+      val follower = brokers.filter(_.config.brokerId != leaderId)(0)
+
+      producer.send(new ProducerRecord(topic, 0, null, msg)).get
+      messagesWritten += 1
+
+      //As soon as it replicates, bounce the follower
+      bounce(follower)
+
+      log(leader, follower)
+      awaitISR(tp)
+
+      //Then bounce the leader
+      bounce(leader)
+
+      log(leader, follower)
+      awaitISR(tp)
+
+      //Ensure no data was lost
+      assertTrue(brokers.forall { broker => getLog(broker, 0).logEndOffset == messagesWritten })
+    }
+  }
+
+  private def log(leader: KafkaServer, follower: KafkaServer): Unit = {
+    info(s"Bounce complete for follower ${follower.config.brokerId}")
+    info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries())
+    info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries())
+  }
+
+  private def waitForLogsToMatch(b1: KafkaServer, b2: KafkaServer, partition: Int = 0): Unit = {
+    TestUtils.waitUntilTrue(() => {getLog(b1, partition).logEndOffset == getLog(b2, partition).logEndOffset}, "Logs didn't match.")
+  }
+
+  private def printSegments(): Unit = {
+    info("Broker0:")
+    DumpLogSegments.main(Seq("--files", getLogFile(brokers(0), 0).getCanonicalPath).toArray)
+    info("Broker1:")
+    DumpLogSegments.main(Seq("--files", getLogFile(brokers(1), 0).getCanonicalPath).toArray)
+  }
+
+  private def startConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
+    val consumerConfig = new Properties()
+    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers))
+    consumerConfig.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(getLogFile(brokers(1), 0).length() * 2))
+    consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, String.valueOf(getLogFile(brokers(1), 0).length() * 2))
+    consumer = new KafkaConsumer(consumerConfig, new StubDeserializer, new StubDeserializer)
+    consumer.assign(List(new TopicPartition(topic, 0)).asJava)
+    consumer.seek(new TopicPartition(topic, 0), 0)
+    consumer
+  }
+
+  private def deleteMessagesFromLogFile(bytes: Long, broker: KafkaServer, partitionId: Int): Unit = {
+    val logFile = getLogFile(broker, partitionId)
+    val writable = new RandomAccessFile(logFile, "rwd")
+    writable.setLength(logFile.length() - bytes)
+    writable.close()
+  }
+
+  private def bufferingProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
+    createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1, lingerMs = 10000,
+      props = Option(CoreUtils.propsWith(
+        (ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(msg.length * 1000))
+        , (ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
+      )))
+  }
+
+  private def getLogFile(broker: KafkaServer, partition: Int): File = {
+    val log: Log = getLog(broker, partition)
+    log.flush()
+    log.dir.listFiles.filter(_.getName.endsWith(".log"))(0)
+  }
+
+  private def getLog(broker: KafkaServer, partition: Int): Log = {
+    broker.logManager.logsByTopicPartition.get(new TopicPartition(topic, partition)).get
+  }
+
+  private def bounce(follower: KafkaServer): Unit = {
+    follower.shutdown()
+    follower.startup()
+    producer.close()
+    producer = createProducer() //TODO not sure why we need to recreate the producer, but it doesn't reconnect if we don't
+  }
+
+  private def epochCache(broker: KafkaServer): LeaderEpochFileCache = {
+    getLog(broker, 0).leaderEpochCache.asInstanceOf[LeaderEpochFileCache]
+  }
+
+  private def latestRecord(leader: KafkaServer, offset: Int = -1, partition: Int = 0): RecordBatch = {
+    getLog(leader, partition).activeSegment.read(0, None, Integer.MAX_VALUE)
+      .records.batches().asScala.toSeq.last
+  }
+
+  private def awaitISR(tp: TopicPartition): Boolean = {
+    TestUtils.waitUntilTrue(() => {
+      leader.replicaManager.getReplicaOrException(tp).partition.inSyncReplicas.map(_.brokerId).size == 2
+    }, "")
+  }
+
+  private def createProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
+    createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1)
+  }
+
+  private def leader(): KafkaServer = {
+    assertEquals(2, brokers.size)
+    val leaderId = zkUtils.getLeaderForPartition(topic, 0).get
+    brokers.filter(_.config.brokerId == leaderId)(0)
+  }
+
+  private def follower(): KafkaServer = {
+    assertEquals(2, brokers.size)
+    val leader = zkUtils.getLeaderForPartition(topic, 0).get
+    brokers.filter(_.config.brokerId != leader)(0)
+  }
+
+  private def createBroker(id: Int): KafkaServer = {
+    val config = createBrokerConfig(id, zkConnect)
+    if(!KIP_101_ENABLED) {
+      config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_0_11_0_IV1.version)
+      config.setProperty(KafkaConfig.LogMessageFormatVersionProp, KAFKA_0_11_0_IV1.version)
+    }
+    createServer(fromProps(config))
+  }
+
+  private class StubDeserializer extends Deserializer[Array[Byte]] {
+    override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}
+
+    override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = { data }
+
+    override def close(): Unit = {}
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
new file mode 100644
index 0000000..1a24c34
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -0,0 +1,721 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server.epoch
+import java.io.File
+
+import kafka.server.LogOffsetMetadata
+import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
+import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Unit test for the LeaderEpochFileCache.
+  */
+class LeaderEpochFileCacheTest {
+  val tp = new TopicPartition("TestTopic", 5)
+  var checkpoint: LeaderEpochCheckpoint = _
+
+  @Test
+  def shouldAddEpochAndMessageOffsetToCache() = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When
+    cache.assign(epoch = 2, offset = 10)
+    leo = 11
+
+    //Then
+    assertEquals(2, cache.latestUsedEpoch())
+    assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
+    assertEquals(11, cache.endOffsetFor(2)) //should match leo
+  }
+  
+  @Test
+  def shouldUpdateEpochWithLogEndOffset() = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    leo = 9
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When
+    cache.cacheLatestEpoch(2)
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //Then
+    assertEquals(2, cache.latestUsedEpoch())
+    assertEquals(EpochEntry(2, 9), cache.epochEntries()(0))
+  }
+
+  @Test
+  def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When just one epoch
+    cache.assign(epoch = 2, offset = 11)
+    cache.assign(epoch = 2, offset = 12)
+    leo = 14
+
+    //Then
+    assertEquals(14, cache.endOffsetFor(2))
+  }
+
+  @Test
+  def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given cache with some data on leader
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 11)
+    cache.assign(epoch = 3, offset = 12)
+
+    //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
+    val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
+
+    //Then
+    assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor)
+  }
+
+  @Test
+  def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    leo = 9
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    cache.cacheLatestEpoch(2)
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //When called again later
+    leo = 10
+    cache.cacheLatestEpoch(2)
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //Then the offset should NOT have been updated
+    assertEquals(9, cache.epochEntries()(0).startOffset)
+  }
+
+  @Test
+  def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    leo = 9
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    cache.cacheLatestEpoch(2)
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //When update epoch with same leo
+    cache.cacheLatestEpoch(3)
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //Then the offset should NOT have been updated
+    assertEquals(9, cache.endOffsetFor(3))
+    assertEquals(9, cache.endOffsetFor(2))
+    assertEquals(3, cache.latestUsedEpoch())
+  }
+  
+  @Test
+  def shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned() = {
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => new LogOffsetMetadata(0), checkpoint)
+    cache.assign(2, 6)
+
+    //When called again later with a greater offset
+    cache.assign(2, 10)
+
+    //Then later update should have been ignored
+    assertEquals(6, cache.epochEntries()(0).startOffset)
+  }
+
+  @Test
+  def shouldReturnUnsupportedIfNoEpochRecorded(){
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //Then
+    assertEquals(UNDEFINED_EPOCH, cache.latestUsedEpoch())
+    assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0))
+  }
+
+  @Test
+  def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    cache.assign(epoch = 5, offset = 11)
+    cache.assign(epoch = 6, offset = 12)
+    cache.assign(epoch = 7, offset = 13)
+
+    //When
+    val offset = cache.endOffsetFor(5 - 1)
+
+    //Then
+    assertEquals(UNDEFINED_EPOCH_OFFSET, offset)
+  }
+
+  @Test
+  def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When several epochs
+    cache.assign(epoch = 1, offset = 11)
+    cache.assign(epoch = 1, offset = 12)
+    cache.assign(epoch = 2, offset = 13)
+    cache.assign(epoch = 2, offset = 14)
+    cache.assign(epoch = 3, offset = 15)
+    cache.assign(epoch = 3, offset = 16)
+    leo = 17
+
+    //Then get the start offset of the next epoch
+    assertEquals(15, cache.endOffsetFor(2))
+  }
+
+  @Test
+  def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(){
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When
+    cache.assign(epoch = 0, offset = 10)
+    cache.assign(epoch = 2, offset = 13)
+    cache.assign(epoch = 4, offset = 17)
+
+    //Then
+    assertEquals(13, cache.endOffsetFor(requestedEpoch = 1))
+    assertEquals(17, cache.endOffsetFor(requestedEpoch = 2))
+  }
+
+  @Test
+  def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 2, offset = 7)
+
+    //Then
+    assertEquals(1, cache.epochEntries.size)
+    assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0))
+  }
+
+  @Test
+  def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = {
+    val leo = 100
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When
+    cache.cacheLatestEpoch(epoch = 2)
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //Then
+    assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3))
+  }
+
+  @Test
+  def shouldSupportEpochsThatDoNotStartFromZero(): Unit = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When
+    cache.assign(epoch = 2, offset = 6)
+    leo = 7
+
+    //Then
+    assertEquals(leo, cache.endOffsetFor(2))
+    assertEquals(1, cache.epochEntries.size)
+    assertEquals(EpochEntry(2, 6), cache.epochEntries()(0))
+  }
+
+  @Test
+  def shouldPersistEpochsBetweenInstances(){
+    def leoFinder() = new LogOffsetMetadata(0)
+    val checkpointPath = TestUtils.tempFile().getAbsolutePath
+    checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+
+    //When
+    val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
+    val cache2 = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint2)
+
+    //Then
+    assertEquals(1, cache2.epochEntries.size)
+    assertEquals(EpochEntry(2, 6), cache2.epochEntries.toList(0))
+  }
+
+  @Test
+  def shouldNotLetEpochGoBackwardsEvenIfMessageEpochsDo(): Unit = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //Given
+    cache.assign(epoch = 1, offset = 5); leo = 6
+    cache.assign(epoch = 2, offset = 6); leo = 7
+
+    //When we update an epoch in the past with an earlier offset
+    cache.assign(epoch = 1, offset = 7); leo = 8
+
+    //Then epoch should not be changed
+    assertEquals(2, cache.latestUsedEpoch())
+
+    //Then end offset for epoch 1 shouldn't have changed
+    assertEquals(6, cache.endOffsetFor(1))
+
+    //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't thing of a better option)
+    assertEquals(8, cache.endOffsetFor(2))
+
+    //Epoch history shouldn't have changed
+    assertEquals(EpochEntry(1, 5), cache.epochEntries()(0))
+    assertEquals(EpochEntry(2, 6), cache.epochEntries()(1))
+  }
+
+  @Test
+  def shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress() = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When epoch goes forward but offset goes backwards
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 5)
+
+    //Then latter assign should be ignored
+    assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0))
+  }
+
+  @Test
+  def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.cacheLatestEpoch(epoch = 0) //leo=0
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //When
+    cache.cacheLatestEpoch(epoch = 1) //leo=0
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //Then epoch should go up
+    assertEquals(1, cache.latestUsedEpoch())
+    //offset for 1 should still be 0
+    assertEquals(0, cache.endOffsetFor(1))
+    //offset for 0 should the start offset of epoch(1) => 0
+    assertEquals(0, cache.endOffsetFor(0))
+
+    //When we write 5 messages as epoch 1
+    leo = 5
+
+    //Then end offset for epoch(1) should be leo => 5
+    assertEquals(5, cache.endOffsetFor(1))
+    //Epoch(0) should still show the start offset for Epoch(1) => 0
+    assertEquals(0, cache.endOffsetFor(0))
+
+    //When
+    cache.cacheLatestEpoch(epoch = 2) //leo=5
+    cache.maybeAssignLatestCachedEpochToLeo()
+    leo = 10 //write another 5 messages
+
+    //Then end offset for epoch(2) should be leo => 10
+    assertEquals(10, cache.endOffsetFor(2))
+
+    //end offset for epoch(1) should be the start offset of epoch(2) => 5
+    assertEquals(5, cache.endOffsetFor(1))
+
+    //epoch (0) should still be 0
+    assertEquals(0, cache.endOffsetFor(0))
+  }
+
+  @Test
+  def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = {
+    var leo = 0
+    def leoFinder() = new LogOffsetMetadata(leo)
+
+    //When new
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When Messages come in
+    cache.assign(epoch = 0, offset = 0); leo = 1
+    cache.assign(epoch = 0, offset = 1); leo = 2
+    cache.assign(epoch = 0, offset = 2); leo = 3
+
+    //Then epoch should stay, offsets should grow
+    assertEquals(0, cache.latestUsedEpoch())
+    assertEquals(leo, cache.endOffsetFor(0))
+
+    //When messages arrive with greater epoch
+    cache.assign(epoch = 1, offset = 3); leo = 4
+    cache.assign(epoch = 1, offset = 4); leo = 5
+    cache.assign(epoch = 1, offset = 5); leo = 6
+
+    assertEquals(1, cache.latestUsedEpoch())
+    assertEquals(leo, cache.endOffsetFor(1))
+
+    //When
+    cache.assign(epoch = 2, offset = 6); leo = 7
+    cache.assign(epoch = 2, offset = 7); leo = 8
+    cache.assign(epoch = 2, offset = 8); leo = 9
+
+    assertEquals(2, cache.latestUsedEpoch())
+    assertEquals(leo, cache.endOffsetFor(2))
+
+    //Older epochs should return the start offset of the first message in the subsequent epoch.
+    assertEquals(3, cache.endOffsetFor(0))
+    assertEquals(6, cache.endOffsetFor(1))
+  }
+
+  @Test
+  def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When clear latest on epoch boundary
+    cache.clearLatest(offset = 8)
+
+    //Then should remove two latest epochs (remove is inclusive)
+    assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset ON epoch boundary
+    cache.clearEarliest(offset = 8)
+
+    //Then should preserve (3, 8)
+    assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset BETWEEN epoch boundaries
+    cache.clearEarliest(offset = 9)
+
+    //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
+    assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldNotClearAnythingIfOffsetToEarly(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset before first epoch offset
+    cache.clearEarliest(offset = 1)
+
+    //Then nothing should change
+    assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset on earliest epoch boundary
+    cache.clearEarliest(offset = 6)
+
+    //Then nothing should change
+    assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldRetainLatestEpochOnClearAllEarliest(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When
+    cache.clearEarliest(offset = 11)
+
+    //Then retain the last
+    assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When we clear from a postition between offset 8 & offset 11
+    cache.clearEarliest(offset = 9)
+
+    //Then we should update the middle epoch entry's offset
+    assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 0, offset = 0)
+    cache.assign(epoch = 1, offset = 7)
+    cache.assign(epoch = 2, offset = 10)
+
+    //When we clear from a postition between offset 0 & offset 7
+    cache.clearEarliest(offset = 5)
+
+    //Then we should keeep epoch 0 but update the offset appropriately
+    assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset beyond last epoch
+    cache.clearEarliest(offset = 15)
+
+    //Then update the last
+    assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset BETWEEN epoch boundaries
+    cache.clearLatest(offset = 9)
+
+    //Then should keep the preceding epochs
+    assertEquals(3, cache.latestUsedEpoch())
+    assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries)
+  }
+
+  @Test
+  def shouldClearAllEntries(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When 
+    cache.clear()
+
+    //Then 
+    assertEquals(0, cache.epochEntries.size)
+  }
+
+  @Test
+  def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset on epoch boundary
+    cache.clearLatest(offset = UNDEFINED_EPOCH_OFFSET)
+
+    //Then should do nothing
+    assertEquals(3, cache.epochEntries.size)
+  }
+
+  @Test
+  def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = {
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //Given
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(epoch = 2, offset = 6)
+    cache.assign(epoch = 3, offset = 8)
+    cache.assign(epoch = 4, offset = 11)
+
+    //When reset to offset on epoch boundary
+    cache.clearEarliest(offset = UNDEFINED_EPOCH_OFFSET)
+
+    //Then should do nothing
+    assertEquals(3, cache.epochEntries.size)
+  }
+
+  @Test
+  def shouldFetchLatestEpochOfEmptyCache(): Unit = {
+    //Given
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //When
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //Then
+    assertEquals(-1, cache.latestUsedEpoch)
+  }
+
+  @Test
+  def shouldFetchEndOffsetOfEmptyCache(): Unit = {
+    //Given
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //When
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //Then
+    assertEquals(-1, cache.endOffsetFor(7))
+  }
+
+  @Test
+  def shouldClearEarliestOnEmptyCache(): Unit = {
+    //Given
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //When
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //Then
+    cache.clearEarliest(7)
+  }
+
+  @Test
+  def shouldClearLatestOnEmptyCache(): Unit = {
+    //Given
+    def leoFinder() = new LogOffsetMetadata(0)
+
+    //When
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //Then
+    cache.clearLatest(7)
+  }
+
+  @Test
+  def shouldUpdateEpochCacheOnLeadershipChangeThenCommit(): Unit ={
+    //Given
+    def leoFinder() = new LogOffsetMetadata(5)
+    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+
+    //When
+    cache.cacheLatestEpoch(2)
+
+    //Then
+    assertEquals(UNDEFINED_EPOCH, cache.latestUsedEpoch())
+
+    //When
+    cache.maybeAssignLatestCachedEpochToLeo()
+
+    //Then should have saved epoch
+    assertEquals(2, cache.latestUsedEpoch())
+
+    //Then should have applied LEO to epoch
+    assertEquals(5, cache.endOffsetFor(2))
+  }
+
+  @Before
+  def setUp() {
+    checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
new file mode 100644
index 0000000..c5bb5e4
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -0,0 +1,283 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.epoch
+
+import java.util.{Map => JMap}
+
+import kafka.admin.AdminUtils
+import kafka.server.KafkaConfig._
+import kafka.server.{BlockingSend, KafkaConfig, KafkaServer, ReplicaFetcherBlockingSend}
+import kafka.utils.TestUtils._
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.kafka.common.utils.SystemTime
+import org.apache.kafka.common.TopicPartition
+
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
+
+import scala.collection.JavaConverters._
+import scala.collection.Map
+
+class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
+  var brokers: Seq[KafkaServer] = null
+  val topic1 = "foo"
+  val topic2 = "bar"
+  val t1p0 = new TopicPartition(topic1, 0)
+  val t1p1 = new TopicPartition(topic1, 1)
+  val t1p2 = new TopicPartition(topic1, 2)
+  val t2p0 = new TopicPartition(topic2, 0)
+  val t2p2 = new TopicPartition(topic2, 2)
+  val tp = t1p0
+  var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val props = createBrokerConfigs(2, zkConnect)
+    brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
+  }
+
+  @After
+  override def tearDown() {
+    brokers.foreach(_.shutdown())
+    if (producer != null)
+      producer.close()
+    super.tearDown()
+  }
+
+  @Test
+  def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
+    // Given two topics with replication of a single partition
+    for (topic <- List(topic1, topic2)) {
+      createTopic(zkUtils, topic, Map(0 -> Seq(0, 1)), servers = brokers)
+    }
+
+    // When we send four messages
+    sendFourMessagesToEachTopic()
+
+    //Then they should be stamped with Leader Epoch 0
+    var expectedLeaderEpoch = 0
+    waitUntilTrue(() => messagesHaveLeaderEpoch(brokers(0), expectedLeaderEpoch, 0), "Leader epoch should be 0")
+
+    //Given we then bounce the leader
+    brokers(0).shutdown()
+    brokers(0).startup()
+
+    //Then LeaderEpoch should now have changed from 0 -> 1
+    expectedLeaderEpoch = 1
+    waitForEpochChangeTo(topic1, 0, expectedLeaderEpoch)
+    waitForEpochChangeTo(topic2, 0, expectedLeaderEpoch)
+
+    //Given we now send messages
+    sendFourMessagesToEachTopic()
+
+    //The new messages should be stamped with LeaderEpoch = 1
+    waitUntilTrue(() => messagesHaveLeaderEpoch(brokers(0), expectedLeaderEpoch, 4), "Leader epoch should be 1")
+  }
+
+  @Test
+  def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = {
+
+    //3 brokers, put partition on 100/101 and then pretend to be 102
+    brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic1, Map(
+      0 -> Seq(100),
+      1 -> Seq(101)
+    ))
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic2, Map(
+      0 -> Seq(100)
+    ))
+
+    //Send messages equally to the two partitions, then half as many to a third
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1)
+    (0 until 10).foreach { _ =>
+      producer.send(new ProducerRecord(topic1, 0, null, "IHeartLogs".getBytes))
+    }
+    (0 until 20).foreach { _ =>
+      producer.send(new ProducerRecord(topic1, 1, null, "OhAreThey".getBytes))
+    }
+    (0 until 30).foreach { _ =>
+      producer.send(new ProducerRecord(topic2, 0, null, "IReallyDo".getBytes))
+    }
+    producer.flush()
+
+    val fetcher0 = new TestFetcherThread(sender(from = brokers(2), to = brokers(0)))
+    val epochsRequested = Map(t1p0 -> 0, t1p1 -> 0, t2p0 -> 0, t2p2 -> 0)
+
+    //When
+    val offsetsForEpochs = fetcher0.leaderOffsetsFor(epochsRequested)
+
+    //Then end offset should be correct
+    assertEquals(10, offsetsForEpochs(t1p0).endOffset)
+    assertEquals(30, offsetsForEpochs(t2p0).endOffset)
+
+    //And should get no leader for partition error from t1p1 (as it's not on broker 0)
+    assertTrue(offsetsForEpochs(t1p1).hasError)
+    assertEquals(UNKNOWN_TOPIC_OR_PARTITION, offsetsForEpochs(t1p1).error)
+    assertEquals(UNDEFINED_EPOCH_OFFSET, offsetsForEpochs(t1p1).endOffset)
+
+    //Repointing to broker 1 we should get the correct offset for t1p1
+    val fetcher1 = new TestFetcherThread(sender(from = brokers(2), to = brokers(1)))
+    val offsetsForEpochs1 = fetcher1.leaderOffsetsFor(epochsRequested)
+    assertEquals(20, offsetsForEpochs1(t1p1).endOffset)
+  }
+
+  @Test
+  def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = {
+
+    //Setup: we are only interested in the single partition on broker 101
+    brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
+    def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, tp.topic, Map(tp.partition -> Seq(101)))
+    producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 10, acks = -1)
+
+    //1. Given a single message
+    producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
+    var fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
+
+    //Then epoch should be 0 and leo: 1
+    var offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
+    assertEquals(1, offset)
+    assertEquals(leo(), offset)
+
+
+    //2. When broker is bounced
+    brokers(1).shutdown()
+    brokers(1).startup()
+
+    producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
+    fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
+
+
+    //Then epoch 0 should still be the start offset of epoch 1
+    offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()
+    assertEquals(1, offset)
+
+    //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - This is because we have to first change leader to -1 and then change it again to the live replica)
+    assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
+    assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
+
+
+    //3. When broker is bounced again
+    brokers(1).shutdown()
+    brokers(1).startup()
+
+    producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get
+    fetcher = new TestFetcherThread(sender(brokers(0), brokers(1)))
+
+
+    //Then Epoch 0 should still map to offset 1
+    assertEquals(1, fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset())
+
+    //Then Epoch 2 should still map to offset 2
+    assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset())
+
+    //Then Epoch 4 should still map to offset 2
+    assertEquals(3, fetcher.leaderOffsetsFor(Map(tp -> 4))(tp).endOffset())
+    assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 4))(tp).endOffset())
+
+    //Adding some extra assertions here to save test setup.
+    shouldSupportRequestsForEpochsNotOnTheLeader(fetcher)
+  }
+
+  //Appended onto the previous test to save on setup cost.
+  def shouldSupportRequestsForEpochsNotOnTheLeader(fetcher: TestFetcherThread): Unit = {
+    /**
+      * Asking for an epoch not present on the leader should return the
+      * next matching epoch, unless there isn't any, which should return
+      * undefined.
+      */
+
+    val epoch1 = Map(t1p0 -> 1)
+    assertEquals(1, fetcher.leaderOffsetsFor(epoch1)(t1p0).endOffset())
+
+    val epoch3 = Map(t1p0 -> 3)
+    assertEquals(2, fetcher.leaderOffsetsFor(epoch3)(t1p0).endOffset())
+
+    val epoch5 = Map(t1p0 -> 5)
+    assertEquals(-1, fetcher.leaderOffsetsFor(epoch5)(t1p0).endOffset())
+  }
+
+  private def sender(from: KafkaServer, to: KafkaServer): BlockingSend = {
+    val endPoint = from.metadataCache.getAliveBrokers.find(_.id == to.config.brokerId).get.getBrokerEndPoint(from.config.interBrokerListenerName)
+    new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher")
+  }
+
+  private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Boolean = {
+    TestUtils.waitUntilTrue(() => {
+      brokers(0).metadataCache.getPartitionInfo(topic, partition) match {
+        case Some(m) => m.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch == epoch
+        case None => false
+      }
+    }, "Epoch didn't change")
+  }
+
+  private  def messagesHaveLeaderEpoch(broker: KafkaServer, expectedLeaderEpoch: Int, minOffset: Int): Boolean = {
+    var result = true
+    for (topic <- List(topic1, topic2)) {
+      val tp = new TopicPartition(topic, 0)
+      val leo = broker.getLogManager().getLog(tp).get.logEndOffset
+      result = result && leo > 0 && brokers.forall { broker =>
+        broker.getLogManager().getLog(tp).get.logSegments.iterator.forall { segment =>
+          if (segment.read(minOffset, None, Integer.MAX_VALUE) == null) {
+            false
+          } else {
+            segment.read(minOffset, None, Integer.MAX_VALUE)
+              .records.batches().iterator().asScala.forall(
+              expectedLeaderEpoch == _.partitionLeaderEpoch()
+            )
+          }
+        }
+      }
+    }
+    result
+  }
+
+  private def sendFourMessagesToEachTopic() = {
+    val testMessageList1 = List("test1", "test2", "test3", "test4")
+    val testMessageList2 = List("test5", "test6", "test7", "test8")
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers), retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
+    val records =
+      testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
+        testMessageList2.map(m => new ProducerRecord(topic2, m, m))
+    records.map(producer.send).foreach(_.get)
+    producer.close()
+  }
+
+  /**
+    * Simulates how the Replica Fetcher Thread requests leader offsets for epochs
+    */
+  private class TestFetcherThread(sender: BlockingSend) extends Logging {
+
+    def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
+      val request = new OffsetsForLeaderEpochRequest.Builder(toJavaFormat(partitions))
+      val response = sender.sendRequest(request)
+      response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
+    }
+
+    def toJavaFormat(partitions: Map[TopicPartition, Int]): JMap[TopicPartition, Integer] = {
+      partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
new file mode 100644
index 0000000..77b9068
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -0,0 +1,98 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.epoch
+
+import kafka.server.OffsetsForLeaderEpoch
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.EpochEndOffset
+import org.easymock.EasyMock._
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class OffsetsForLeaderEpochTest {
+
+  @Test
+  def shouldGetEpochsFromReplica(): Unit = {
+    val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
+    val replica = createNiceMock(classOf[kafka.cluster.Replica])
+    val cache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
+
+    //Given
+    val tp = new TopicPartition("topic", 1)
+    val offset = 42
+    val epochRequested: Integer = 5
+    val request = mutable.Map(tp -> epochRequested).asJava
+
+    //Stubs
+    expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
+    expect(replica.epochs).andReturn(Some(cache))
+    expect(cache.endOffsetFor(epochRequested)).andReturn(offset)
+    replay(replica, replicaManager, cache)
+
+    //When
+    val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request)
+
+    //Then
+    assertEquals(new EpochEndOffset(Errors.NONE, offset), response.get(tp))
+  }
+
+  @Test
+  def shonuldReturnNoLeaderForPartitionIfThrown(): Unit = {
+    val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
+
+    //Given
+    val tp = new TopicPartition("topic", 1)
+    val epochRequested: Integer = 5
+    val request = mutable.Map(tp -> epochRequested).asJava
+
+    //Stubs
+    expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new NotLeaderForPartitionException())
+    replay(replicaManager)
+
+    //When
+    val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request)
+
+    //Then
+    assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp))
+  }
+
+  @Test
+  def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
+    val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager])
+
+    //Given
+    val tp = new TopicPartition("topic", 1)
+    val epochRequested: Integer = 5
+    val request = mutable.Map(tp -> epochRequested).asJava
+
+    //Stubs
+    expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new UnknownTopicOrPartitionException())
+    replay(replicaManager)
+
+    //When
+    val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request)
+
+    //Then
+    assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
new file mode 100644
index 0000000..e04bd95
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -0,0 +1,80 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.epoch.util
+
+import kafka.cluster.BrokerEndPoint
+import kafka.server.BlockingSend
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.AbstractRequest.Builder
+import org.apache.kafka.common.requests.FetchResponse.PartitionData
+import org.apache.kafka.common.requests.{AbstractRequest, EpochEndOffset, FetchResponse, OffsetsForLeaderEpochResponse}
+import org.apache.kafka.common.utils.{SystemTime, Time}
+
+/**
+  * Stub network client used for testing the ReplicaFetcher, wraps the MockClient used for consumer testing
+  */
+class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
+  private val client = new MockClient(new SystemTime)
+  var fetchCount = 0
+  var epochFetchCount = 0
+  var callback: Option[() => Unit] = None
+
+  def setEpochRequestCallback(postEpochFunction: () => Unit){
+    callback = Some(postEpochFunction)
+  }
+
+  override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
+
+    //Send the request to the mock client
+    val clientRequest = request(requestBuilder)
+    client.send(clientRequest, time.milliseconds())
+
+    //Create a suitable response based on the API key
+    val response = requestBuilder.apiKey() match {
+      case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
+        callback match {
+          case Some(f) => f()
+          case None => //nothing
+        }
+        epochFetchCount += 1
+        new OffsetsForLeaderEpochResponse(offsets)
+
+      case ApiKeys.FETCH =>
+        fetchCount += 1
+        new FetchResponse(new java.util.LinkedHashMap[TopicPartition, PartitionData], 0)
+
+      case _ =>
+        throw new UnsupportedOperationException
+    }
+
+    //Use mock client to create the appropriate response object
+    client.respondFrom(response, new Node(destination.id, destination.host, destination.port))
+    client.poll(30, time.milliseconds()).iterator().next()
+  }
+
+  private def request(requestBuilder: Builder[_ <: AbstractRequest]): ClientRequest = {
+    client.newClientRequest(
+      destination.id.toString,
+      requestBuilder,
+      time.milliseconds(),
+      true)
+  }
+
+  override def close(): Unit = {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8766855..3dbe2de 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -37,6 +37,7 @@ import kafka.producer._
 import kafka.security.auth.{Acl, Authorizer, Resource}
 import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.server._
+import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
 import kafka.utils.ZkUtils._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
@@ -1079,7 +1080,7 @@ object TestUtils extends Logging {
     // ensure that topic is removed from all cleaner offsets
     TestUtils.waitUntilTrue(() => servers.forall(server => topicPartitions.forall { tp =>
       val checkpoints = server.getLogManager().logDirs.map { logDir =>
-        new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+        new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
       }
       checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
     }), "Cleaner offset for deleted partition should have been removed")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index 155eb7d..e5e140b 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -93,7 +93,6 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
             self.kafka.stop_node(node)
             self.kafka.start_node(node)
 
-    @ignore
     @cluster(num_nodes=9)
     @matrix(security_protocol=["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"])
     def test_zk_security_upgrade(self, security_protocol):


Mime
View raw message