kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6795; Added unit tests for ReplicaAlterLogDirsThread
Date Wed, 25 Apr 2018 21:25:33 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbb5b51  KAFKA-6795; Added unit tests for ReplicaAlterLogDirsThread
cbb5b51 is described below

commit cbb5b51475368613c7972297ea6055a4f75285e1
Author: Anna Povzner <anna@confluent.io>
AuthorDate: Wed Apr 25 14:24:29 2018 -0700

    KAFKA-6795; Added unit tests for ReplicaAlterLogDirsThread
    
    Added unit tests for ReplicaAlterLogDirsThread. Mostly focused on unit tests for truncating
logic.
    
    Fixed  ReplicaAlterLogDirsThread.buildLeaderEpochRequest() to use future replica's latest
epoch (not the latest epoch of replica it is fetching from). This follows the logic that offset
for leader epoch request should be based on leader epoch of the follower (in this case it's
the future local replica).
    
    Also fixed PartitionFetchState constructor that takes offset and delay. The code ignored
the delay parameter and used 0 for the delay. This constructor is used only by another constructor
which passes delay = 0, which luckily works.
    
    Author: Anna Povzner <anna@confluent.io>
    
    Reviewers: Dong Lin <lindong28@gmail.com>
    
    Closes #4918 from apovzner/kafka-6795
---
 .../scala/kafka/server/AbstractFetcherThread.scala |   2 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  15 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 526 +++++++++++++++++++++
 3 files changed, 539 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8d787c9..f919ddf 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -434,7 +434,7 @@ case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem,
truncating
 
   def this(offset: Long, truncatingLog: Boolean) = this(offset, new DelayedItem(0), truncatingLog)
 
-  def this(offset: Long, delay: DelayedItem) = this(offset, new DelayedItem(0), false)
+  def this(offset: Long, delay: DelayedItem) = this(offset, delay, false)
 
   def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
 
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 48c83d4..0faf5dc 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -58,8 +58,6 @@ class ReplicaAlterLogDirsThread(name: String,
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
-  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)
-
   def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
     var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null
     val request = fetchRequest.underlying.build()
@@ -141,7 +139,13 @@ class ReplicaAlterLogDirsThread(name: String,
       delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
   }
 
+  /**
+   * Builds offset for leader epoch requests for partitions that are in the truncating phase
based
+   * on latest epochs of the future replicas (the one that is fetching)
+   */
   def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]):
ResultWithPartitions[Map[TopicPartition, Int]] = {
+    def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp,
Request.FutureLocalReplicaId).map(_.epochs.get)
+
     val partitionEpochOpts = allPartitions
       .filter { case (_, state) => state.isTruncatingLog }
       .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
@@ -152,6 +156,11 @@ class ReplicaAlterLogDirsThread(name: String,
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
+  /**
+   * Fetches offset for leader epoch from local replica for each given topic partitions
+   * @param partitions map of topic partition -> leader epoch of the future replica
+   * @return map of topic partition -> end offset for a requested leader epoch
+   */
   def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset]
= {
     partitions.map { case (tp, epoch) =>
       try {
@@ -263,4 +272,4 @@ object ReplicaAlterLogDirsThread {
 
     override def toString = underlying.toString
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
new file mode 100644
index 0000000..a0f1dae
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -0,0 +1,526 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+
+import kafka.api.Request
+import kafka.cluster.{BrokerEndPoint, Replica, Partition}
+import kafka.log.LogManager
+import kafka.server.AbstractFetcherThread.ResultWithPartitions
+import kafka.server.FetchPartitionData
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, KafkaStorageException}
+import kafka.utils.{DelayedItem, TestUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchMetadata =>
JFetchMetadata}
+import org.apache.kafka.common.requests.FetchResponse.PartitionData
+import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH_OFFSET, UNDEFINED_EPOCH}
+import org.apache.kafka.common.utils.SystemTime
+import org.easymock.EasyMock._
+import org.easymock.{Capture, CaptureType, EasyMock, IAnswer}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+import scala.collection.{Map, mutable}
+
+class ReplicaAlterLogDirsThreadTest {
+
+  private val t1p0 = new TopicPartition("topic1", 0)
+  private val t1p1 = new TopicPartition("topic1", 1)
+
+  @Test
+  def issuesEpochRequestFromLocalReplica(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+
+    //Setup all dependencies
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val leaderEpoch = 2
+    val leo = 13
+
+    //Stubs
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes()
+    stub(replica, replica, futureReplica, partition, replicaManager)
+
+    replay(leaderEpochs, replicaManager, replica)
+
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = null,
+      brokerTopicStats = null)
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.NONE, leo),
+      t1p1 -> new EpochEndOffset(Errors.NONE, leo)
+    )
+
+    assertEquals("results from leader epoch request should have offset from local replica",
+                 expected, result)
+  }
+
+  @Test
+  def fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+
+    //Setup all dependencies
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val replica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val leaderEpoch = 2
+    val leo = 13
+
+    //Stubs
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
+    expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p1)).andThrow(new KafkaStorageException).once()
+    expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
+
+    replay(leaderEpochs, replicaManager, replica)
+
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = null,
+      brokerTopicStats = null)
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.NONE, leo),
+      t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET)
+    )
+
+    assertEquals(expected, result)
+  }
+
+  @Test
+  def shouldTruncateToReplicaOffset(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochsT1p0 = createMock(classOf[LeaderEpochCache])
+    val leaderEpochsT1p1 = createMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replicaT1p0 = createNiceMock(classOf[Replica])
+    val replicaT1p1 = createNiceMock(classOf[Replica])
+    // one future replica mock because our mocking methods return same values for both future
replicas
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] 
= EasyMock.newCapture()
+
+    val leaderEpoch = 2
+    val futureReplicaLEO = 191
+    val replicaT1p0LEO = 190
+    val replicaT1p1LEO = 192
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
+    expect(replicaT1p0.epochs).andReturn(Some(leaderEpochsT1p0)).anyTimes()
+    expect(replicaT1p1.epochs).andReturn(Some(leaderEpochsT1p1)).anyTimes()
+    expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+    expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes()
+    expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn(replicaT1p0LEO).anyTimes()
+    expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn(replicaT1p1LEO).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager,
responseCallback)
+
+    replay(leaderEpochsT1p0, leaderEpochsT1p1, futureReplicaLeaderEpochs, replicaManager,
+           logManager, quotaManager, replicaT1p0, replicaT1p1, futureReplica, partition)
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    //Run it
+    thread.doWork()
+
+    //We should have truncated to the offsets in the response
+    assertTrue(truncateToCapture.getValues.asScala.contains(replicaT1p0LEO))
+    assertTrue(truncateToCapture.getValues.asScala.contains(futureReplicaLEO))
+  }
+
+  @Test
+  def shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncated: Capture[Long] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] 
= EasyMock.newCapture()
+
+    val initialFetchOffset = 100
+    val futureReplicaLEO = 111
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
+    expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+
+    // pretend this is a completely new future replica, with no leader epochs recorded
+    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).anyTimes()
+
+    // since UNDEFINED_EPOCH is -1 wich will be lower than any valid leader epoch, the method
+    // will return UNDEFINED_EPOCH_OFFSET if requested epoch is lower than the first epoch
cached
+    expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn(UNDEFINED_EPOCH_OFFSET).anyTimes()
+    stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)
+    replay(replicaManager, logManager, quotaManager, leaderEpochs, futureReplicaLeaderEpochs,
+           replica, futureReplica, partition)
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> initialFetchOffset))
+
+    //Run it
+    thread.doWork()
+
+    //We should have truncated to initial fetch offset
+    assertEquals("Expected future replica to truncate to initial fetch offset if replica
returns UNDEFINED_EPOCH_OFFSET",
+                 initialFetchOffset, truncated.getValue)
+  }
+
+  @Test
+  def shouldPollIndefinitelyIfReplicaNotAvailable(): Unit = {
+
+    //Create a capture to track what partitions/offsets are truncated
+    val truncated: Capture[Long] = newCapture(CaptureType.ALL)
+
+    // Setup all the dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[kafka.log.LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] 
= EasyMock.newCapture()
+
+    val futureReplicaLeaderEpoch = 1
+    val futureReplicaLEO = 290
+    val replicaLEO = 300
+
+    //Stubs
+    expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+
+    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(futureReplicaLeaderEpoch).anyTimes()
+    expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn(replicaLEO).anyTimes()
+
+    expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
+    expect(replicaManager.getReplica(t1p0, Request.FutureLocalReplicaId)).andReturn(Some(futureReplica)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p0, Request.FutureLocalReplicaId)).andReturn(futureReplica).anyTimes()
+    // this will cause fetchEpochsFromLeader return an error with undefined offset
+    expect(replicaManager.getReplicaOrException(t1p0)).andThrow(new ReplicaNotAvailableException("")).times(3)
+    expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).once()
+    expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    expect(replicaManager.fetchMessages(
+      EasyMock.anyLong(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          responseCallback.getValue.apply(Seq.empty[(TopicPartition, FetchPartitionData)])
+        }
+      }).anyTimes()
+
+    replay(leaderEpochs, futureReplicaLeaderEpochs, replicaManager, logManager, quotaManager,
+           replica, futureReplica, partition)
+
+    //Create the thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0))
+
+    // Run thread 3 times (exactly number of times we mock exception for getReplicaOrException)
+    (0 to 2).foreach { _ =>
+      thread.doWork()
+                     }
+
+    // Nothing happened since the replica was not available
+    assertEquals(0, truncated.getValues.size())
+
+    // Next time we loop, getReplicaOrException will return replica
+    thread.doWork()
+
+    // Now the final call should have actually done a truncation (to offset futureReplicaLEO)
+    assertEquals(futureReplicaLEO, truncated.getValue)
+  }
+
+  @Test
+  def shouldFetchLeaderEpochOnFirstFetchOnly(): Unit = {
+
+    //Setup all dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+    val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] 
= EasyMock.newCapture()
+
+    val leaderEpoch = 5
+    val futureReplicaLEO = 190
+    val replicaLEO = 213
+
+    //Stubs
+    expect(partition.truncateTo(futureReplicaLEO, true)).once()
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
+
+    expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
+    expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch)
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(replicaLEO)
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)
+
+    replay(leaderEpochs, futureReplicaLeaderEpochs, replicaManager, logManager, quotaManager,
+           replica, futureReplica, partition)
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0))
+
+    // loop few times
+    (0 to 3).foreach { _ =>
+      thread.doWork()
+                     }
+
+    //Assert that truncate to is called exactly once (despite more loops)
+    verify(partition)
+  }
+
+  @Test
+  def shouldFetchOneReplicaAtATime(): Unit = {
+
+    //Setup all dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    //Stubs
+    expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replica, futureReplica, partition, replicaManager)
+
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    val ResultWithPartitions(fetchRequest, partitionsWithError) =
+      thread.buildFetchRequest(Seq((t1p0, new PartitionFetchState(150)), (t1p1, new PartitionFetchState(160))))
+
+    assertFalse(fetchRequest.isEmpty)
+    assertFalse(partitionsWithError.nonEmpty)
+    val request = fetchRequest.underlying.build()
+    assertEquals(0, request.minBytes)
+    val fetchInfos = request.fetchData.asScala.toSeq
+    assertEquals(1, fetchInfos.length)
+    assertEquals("Expected fetch request for largest partition", t1p1, fetchInfos.head._1)
+    assertEquals(160, fetchInfos.head._2.fetchOffset)
+  }
+
+  @Test
+  def shouldFetchNonDelayedAndNonTruncatingReplicas(): Unit = {
+
+    //Setup all dependencies
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
+    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager = createMock(classOf[LogManager])
+    val replica = createNiceMock(classOf[Replica])
+    val futureReplica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    //Stubs
+    expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    stub(replica, replica, futureReplica, partition, replicaManager)
+
+    replay(replicaManager, logManager, quotaManager, replica, futureReplica, partition)
+
+    //Create the fetcher thread
+    val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+    val thread = new ReplicaAlterLogDirsThread(
+      "alter-logs-dirs-thread-test1",
+      sourceBroker = endPoint,
+      brokerConfig = config,
+      replicaMgr = replicaManager,
+      quota = quotaManager,
+      brokerTopicStats = null)
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
+
+    // one partition is ready and one is truncating
+    val ResultWithPartitions(fetchRequest, partitionsWithError) =
+      thread.buildFetchRequest(Seq(
+        (t1p0, new PartitionFetchState(150)),
+        (t1p1, new PartitionFetchState(160, truncatingLog=true))))
+
+    assertFalse(fetchRequest.isEmpty)
+    assertFalse(partitionsWithError.nonEmpty)
+    val fetchInfos = fetchRequest.underlying.build().fetchData.asScala.toSeq
+    assertEquals(1, fetchInfos.length)
+    assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1)
+    assertEquals(150, fetchInfos.head._2.fetchOffset)
+
+    // one partition is ready and one is delayed
+    val ResultWithPartitions(fetchRequest2, partitionsWithError2) =
+      thread.buildFetchRequest(Seq(
+        (t1p0, new PartitionFetchState(140)),
+        (t1p1, new PartitionFetchState(160, delay=new DelayedItem(5000)))))
+
+    assertFalse(fetchRequest2.isEmpty)
+    assertFalse(partitionsWithError2.nonEmpty)
+    val fetchInfos2 = fetchRequest2.underlying.build().fetchData.asScala.toSeq
+    assertEquals(1, fetchInfos2.length)
+    assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1)
+    assertEquals(140, fetchInfos2.head._2.fetchOffset)
+
+    // both partitions are delayed
+    val ResultWithPartitions(fetchRequest3, partitionsWithError3) =
+      thread.buildFetchRequest(Seq(
+        (t1p0, new PartitionFetchState(140, delay=new DelayedItem(5000))),
+        (t1p1, new PartitionFetchState(160, delay=new DelayedItem(5000)))))
+    assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3.isEmpty)
+    assertFalse(partitionsWithError3.nonEmpty)
+  }
+
+  def stub(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica, partition:
Partition, replicaManager: ReplicaManager) = {
+    expect(replicaManager.getReplica(t1p0)).andReturn(Some(replicaT1p0)).anyTimes()
+    expect(replicaManager.getReplica(t1p0, Request.FutureLocalReplicaId)).andReturn(Some(futureReplica)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p0, Request.FutureLocalReplicaId)).andReturn(futureReplica).anyTimes()
+    expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
+    expect(replicaManager.getReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes()
+    expect(replicaManager.getReplica(t1p1, Request.FutureLocalReplicaId)).andReturn(Some(futureReplica)).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes()
+    expect(replicaManager.getReplicaOrException(t1p1, Request.FutureLocalReplicaId)).andReturn(futureReplica).anyTimes()
+    expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes()
+  }
+
+  def stubWithFetchMessages(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica,
+                            partition: Partition, replicaManager: ReplicaManager,
+                            responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)]
=> Unit]) = {
+    stub(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager)
+    expect(replicaManager.fetchMessages(
+      EasyMock.anyLong(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyInt(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          responseCallback.getValue.apply(Seq.empty[(TopicPartition, FetchPartitionData)])
+        }
+      }).anyTimes()
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.

Mime
View raw message