kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7051: Improve the efficiency of ReplicaManager (#5206)
Date Mon, 07 Jan 2019 21:20:24 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 145cad7  KAFKA-7051: Improve the efficiency of ReplicaManager (#5206)
145cad7 is described below

commit 145cad752d720c6ea735a5e2f977d10ace5bcc0f
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Mon Jan 7 13:20:07 2019 -0800

    KAFKA-7051: Improve the efficiency of ReplicaManager (#5206)
    
    Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Dong
Lin <lindong28@gmail.com>
---
 .../main/scala/kafka/server/ReplicaManager.scala   | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4cc3feb..33c844f 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -830,10 +830,15 @@ class ReplicaManager(val config: KafkaConfig,
     val logReadResults = readFromLog()
 
     // check if this fetch request can be satisfied right away
-    val logReadResultValues = logReadResults.map { case (_, v) => v }
-    val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
-    val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult)
=>
-      errorIncurred || (readResult.error != Errors.NONE))
+    var bytesReadable: Long = 0
+    var errorReadingData = false
+    val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
+    logReadResults.foreach { case (topicPartition, logReadResult) =>
+      if (logReadResult.error != Errors.NONE)
+        errorReadingData = true
+      bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
+      logReadResultMap.put(topicPartition, logReadResult)
+    }
 
     // respond immediately if 1) fetch request does not want to wait
     //                        2) fetch request does not require any data
@@ -847,11 +852,12 @@ class ReplicaManager(val config: KafkaConfig,
       responseCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
-      val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
-        val fetchInfo = fetchInfos.collectFirst {
-          case (tp, v) if tp == topicPartition => v
-        }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
-        (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
+      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
+      fetchInfos.foreach { case (topicPartition, partitionData) =>
+        logReadResultMap.get(topicPartition).map(logReadResult => {
+          val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
+          fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata,
partitionData))
+        })
       }
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
fetchOnlyFromLeader,
         fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)


Mime
View raw message