kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Refactor replica log dir fetching for improved logging (#6313)
Date Tue, 26 Feb 2019 16:52:25 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 66a6fc7  MINOR: Refactor replica log dir fetching for improved logging (#6313)
66a6fc7 is described below

commit 66a6fc7204b3903ae8015bc8bceb200b6838ec10
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Feb 26 08:38:32 2019 -0800

    MINOR: Refactor replica log dir fetching for improved logging (#6313)
    
    In order to debug problems with log directory reassignments, it is helpful to know when
the fetcher thread begins moving a particular partition. This patch refactors the fetch logic
so that we stick to a selected partition as long as it is available and log a message when
a different partition is selected.
    
    Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Dong Lin <lindong28@gmail.com>,
Jun Rao <junrao@gmail.com>
---
 .../kafka/server/ReplicaAlterLogDirsThread.scala   | 70 ++++++++++++++++------
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  4 +-
 2 files changed, 55 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 54bb2a2..8df234e 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -51,6 +51,7 @@ class ReplicaAlterLogDirsThread(name: String,
   private val replicaId = brokerConfig.brokerId
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
+  private var inProgressPartition: Option[TopicPartition] = None
 
   override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
     replicaMgr.futureLocalReplicaOrException(topicPartition).latestEpoch
@@ -185,32 +186,52 @@ class ReplicaAlterLogDirsThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = true)
   }
 
-  def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
= {
-    // Only include replica in the fetch request if it is not throttled.
-    val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) =>
-      partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
+  private def nextReadyPartition(partitionMap: Map[TopicPartition, PartitionFetchState]):
Option[(TopicPartition, PartitionFetchState)] = {
+    partitionMap.filter { case (_, partitionFetchState) =>
+      partitionFetchState.isReadyForFetch
     }.reduceLeftOption { (left, right) =>
-      if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() &&
left._1.partition() >= right._1.partition()))
+      if ((left._1.topic < right._1.topic) || (left._1.topic == right._1.topic &&
left._1.partition < right._1.partition))
         left
       else
         right
     }
+  }
+
+  private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]):
Option[(TopicPartition, PartitionFetchState)] = {
+    // Only move one partition at a time to increase its catch-up rate and thus reduce the
time spent on
+    // moving any given replica. Replicas are selected in ascending order (lexicographically
by topic) from the
+    // partitions that are ready to fetch. Once selected, we will continue fetching the same
partition until it
+    // becomes unavailable or is removed.
+
+    inProgressPartition.foreach { tp =>
+      val fetchStateOpt = partitionMap.get(tp)
+      fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
+        return Some((tp, fetchState))
+      }
+    }
+
+    inProgressPartition = None
+
+    val nextPartitionOpt = nextReadyPartition(partitionMap)
+    nextPartitionOpt.foreach { case (tp, fetchState) =>
+      inProgressPartition = Some(tp)
+      info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}.
" +
+        s"Including this partition, there are ${partitionMap.size} remaining partitions to
copy by this thread.")
+    }
+    nextPartitionOpt
+  }
 
-    // Only move one replica at a time to increase its catch-up rate and thus reduce the
time spent on moving any given replica
-    // Replicas are ordered by their TopicPartition
+  private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState):
ResultWithPartitions[Option[FetchRequest.Builder]] = {
     val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
     val partitionsWithError = mutable.Set[TopicPartition]()
 
-    if (maxPartitionOpt.nonEmpty) {
-      val (topicPartition, partitionFetchState) = maxPartitionOpt.get
-      try {
-        val logStartOffset = replicaMgr.futureLocalReplicaOrException(topicPartition).logStartOffset
-        requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.fetchOffset,
logStartOffset,
-          fetchSize, Optional.of(partitionFetchState.currentLeaderEpoch)))
-      } catch {
-        case _: KafkaStorageException =>
-          partitionsWithError += topicPartition
-      }
+    try {
+      val logStartOffset = replicaMgr.futureLocalReplicaOrException(tp).logStartOffset
+      requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset,
+        fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
+    } catch {
+      case _: KafkaStorageException =>
+        partitionsWithError += tp
     }
 
     val fetchRequestOpt = if (requestMap.isEmpty) {
@@ -221,7 +242,22 @@ class ReplicaAlterLogDirsThread(name: String,
       Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0,
requestMap)
         .setMaxBytes(maxBytes))
     }
+
     ResultWithPartitions(fetchRequestOpt, partitionsWithError)
   }
 
+  def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
= {
+    // Only include replica in the fetch request if it is not throttled.
+    if (quota.isQuotaExceeded) {
+      ResultWithPartitions(None, Set.empty)
+    } else {
+      selectPartitionToFetch(partitionMap) match {
+        case Some((tp, fetchState)) =>
+          buildFetchForPartition(tp, fetchState)
+        case None =>
+          ResultWithPartitions(None, Set.empty)
+      }
+    }
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index d21e1e1..779c0e5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -526,8 +526,8 @@ class ReplicaAlterLogDirsThreadTest {
     assertEquals(0, request.minBytes)
     val fetchInfos = request.fetchData.asScala.toSeq
     assertEquals(1, fetchInfos.length)
-    assertEquals("Expected fetch request for largest partition", t1p1, fetchInfos.head._1)
-    assertEquals(160, fetchInfos.head._2.fetchOffset)
+    assertEquals("Expected fetch request for first partition", t1p0, fetchInfos.head._1)
+    assertEquals(150, fetchInfos.head._2.fetchOffset)
   }
 
   @Test


Mime
View raw message