This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 66a6fc7 MINOR: Refactor replica log dir fetching for improved logging (#6313) 66a6fc7 is described below commit 66a6fc7204b3903ae8015bc8bceb200b6838ec10 Author: Jason Gustafson AuthorDate: Tue Feb 26 08:38:32 2019 -0800 MINOR: Refactor replica log dir fetching for improved logging (#6313) In order to debug problems with log directory reassignments, it is helpful to know when the fetcher thread begins moving a particular partition. This patch refactors the fetch logic so that we stick to a selected partition as long as it is available and log a message when a different partition is selected. Reviewers: Viktor Somogyi-Vass , Dong Lin , Jun Rao --- .../kafka/server/ReplicaAlterLogDirsThread.scala | 70 ++++++++++++++++------ .../server/ReplicaAlterLogDirsThreadTest.scala | 4 +- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 54bb2a2..8df234e 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -51,6 +51,7 @@ class ReplicaAlterLogDirsThread(name: String, private val replicaId = brokerConfig.brokerId private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes + private var inProgressPartition: Option[TopicPartition] = None override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = { replicaMgr.futureLocalReplicaOrException(topicPartition).latestEpoch @@ -185,32 +186,52 @@ class ReplicaAlterLogDirsThread(name: String, partition.truncateFullyAndStartAt(offset, isFuture = true) } - def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { - // Only include replica in the fetch request if it is not throttled. - val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) => - partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded + private def nextReadyPartition(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { + partitionMap.filter { case (_, partitionFetchState) => + partitionFetchState.isReadyForFetch }.reduceLeftOption { (left, right) => - if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition())) + if ((left._1.topic < right._1.topic) || (left._1.topic == right._1.topic && left._1.partition < right._1.partition)) left else right } + } + + private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { + // Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on + // moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the + // partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it + // becomes unavailable or is removed. + + inProgressPartition.foreach { tp => + val fetchStateOpt = partitionMap.get(tp) + fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState => + return Some((tp, fetchState)) + } + } + + inProgressPartition = None + + val nextPartitionOpt = nextReadyPartition(partitionMap) + nextPartitionOpt.foreach { case (tp, fetchState) => + inProgressPartition = Some(tp) + info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " + + s"Including this partition, there are ${partitionMap.size} remaining partitions to copy by this thread.") + } + nextPartitionOpt + } - // Only move one replica at a time to increase its catch-up rate and thus reduce the time spent on moving any given replica - // Replicas are ordered by their TopicPartition + private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() - if (maxPartitionOpt.nonEmpty) { - val (topicPartition, partitionFetchState) = maxPartitionOpt.get - try { - val logStartOffset = replicaMgr.futureLocalReplicaOrException(topicPartition).logStartOffset - requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, - fetchSize, Optional.of(partitionFetchState.currentLeaderEpoch))) - } catch { - case _: KafkaStorageException => - partitionsWithError += topicPartition - } + try { + val logStartOffset = replicaMgr.futureLocalReplicaOrException(tp).logStartOffset + requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset, + fetchSize, Optional.of(fetchState.currentLeaderEpoch))) + } catch { + case _: KafkaStorageException => + partitionsWithError += tp } val fetchRequestOpt = if (requestMap.isEmpty) { @@ -221,7 +242,22 @@ class ReplicaAlterLogDirsThread(name: String, Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap) .setMaxBytes(maxBytes)) } + ResultWithPartitions(fetchRequestOpt, partitionsWithError) } + def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { + // Only include replica in the fetch request if it is not throttled. + if (quota.isQuotaExceeded) { + ResultWithPartitions(None, Set.empty) + } else { + selectPartitionToFetch(partitionMap) match { + case Some((tp, fetchState)) => + buildFetchForPartition(tp, fetchState) + case None => + ResultWithPartitions(None, Set.empty) + } + } + } + } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index d21e1e1..779c0e5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -526,8 +526,8 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, request.minBytes) val fetchInfos = request.fetchData.asScala.toSeq assertEquals(1, fetchInfos.length) - assertEquals("Expected fetch request for largest partition", t1p1, fetchInfos.head._1) - assertEquals(160, fetchInfos.head._2.fetchOffset) + assertEquals("Expected fetch request for first partition", t1p0, fetchInfos.head._1) + assertEquals(150, fetchInfos.head._2.fetchOffset) } @Test