kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6432: Make index lookup more cache friendly (#5346)
Date Fri, 27 Jul 2018 21:00:25 GMT
This is an automated email from the ASF dual-hosted git repository.

sriharsha pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a61594d  KAFKA-6432: Make index lookup more cache friendly (#5346)
a61594d is described below

commit a61594dee143ba089300546d1994b875a16ba521
Author: ying-zheng <zheng.ying@rocketmail.com>
AuthorDate: Fri Jul 27 14:00:19 2018 -0700

    KAFKA-6432: Make index lookup more cache friendly (#5346)
    
     KAFKA-6432: Make index lookup more cache friendly
    
    For each topic-partition, Kafka broker maintains two indices: one for message offset,
one for message timestamp. By default, a new index entry is appended to each index for every
4KB messages. The lookup of the indices is a simple binary search. The indices are mmaped
files, and cached by Linux page cache.
    
    Both consumer fetch and follower fetch have to do an offset lookup, before accessing the
actual message data. The simple binary search algorithm used for looking up the index is not
cache friendly, and may cause page faults even on high QPS topic-partitions.
    
    In a normal Kafka broker, all the follower fetch requests, and most consumer fetch requests
should only look up the last few entries of the index. We can make the index lookup more cache
friendly, by searching in the last one or two pages of the index first.
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Guozhang Wang <wangguoz@gmail.com>,
Ted Yu <yuzhihong@gmail.com>,  Ismael Juma <github@juma.me.uk>, Sriharsha Chintalapani
<sriharsha@apache.org>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala | 101 ++++++++++++++++++----
 1 file changed, 84 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index e653802..ec9d55f 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -44,9 +44,67 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset:
Lon
   // Length of the index file
   @volatile
   private var _length: Long = _
-
   protected def entrySize: Int
 
+  /*
+   Kafka mmaps index files into memory, and all the read / write operations of the index
is through OS page cache. This
+   avoids blocked disk I/O in most cases.
+
+   To the extent of our knowledge, all the modern operating systems use LRU policy or its
variants to manage page
+   cache. Kafka always appends to the end of the index file, and almost all the index lookups
(typically from in-sync
+   followers or consumers) are very close to the end of the index. So, the LRU cache replacement
policy should work very
+   well with Kafka's index access pattern.
+
+   However, when looking up index, the standard binary search algorithm is not cache friendly,
and can cause unnecessary
+   page faults (the thread is blocked to wait for reading some index entries from hard disk,
as those entries are not
+   cached in the page cache).
+
+   For example, in an index with 13 pages, to lookup an entry in the last page (page #12),
the standard binary search
+   algorithm will read index entries in page #0, 6, 9, 11, and 12.
+   page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
+   steps:       |1| | | | | |3| | |4|  |5 |2/6|
+   In each page, there are hundreds log entries, corresponding to hundreds to thousands of
kafka messages. When the
+   index gradually growing from the 1st entry in page #12 to the last entry in page #12,
all the write (append)
+   operations are in page #12, and all the in-sync follower / consumer lookups read page
#0,6,9,11,12. As these pages
+   are always used in each in-sync lookup, we can assume these pages are fairly recently
used, and are very likely to be
+   in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup
change to #0, 7, 10, 12,
+   and 13:
+   page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
+   steps:       |1| | | | | | |3| | | 4|5 | 6|2/7|
+   Page #7 and page #10 have not been used for a very long time. They are much less likely
to be in the page cache, than
+   the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is
likely to have to read page #7
+   and page #10 from disk (page fault), which can take up to more than a second. In our test,
this can cause the
+   at-least-once produce latency to jump to about 1 second from a few ms.
+
+   Here, we use a more cache-friendly lookup algorithm:
+   if (target > indexEntry[end - N]) // if the target is in the last N entries of the
index
+      binarySearch(end - N, end)
+   else
+      binarySearch(begin, end - N)
+
+   If possible, we only look up in the last N entries of the index. By choosing a proper
constant N, all the in-sync
+   lookups should go to the 1st branch. We call the last N entries the "warm" section. As
we frequently look up in this
+   relatively small section, the pages containing this section are more likely to be in the
page cache.
+
+   We set N (_warmEntries) to 8192, because
+   1. This number is small enough to guarantee all the pages of the "warm" section is touched
in every warm-section
+      lookup. So that, the entire warm section is really "warm".
+      When doing warm-section lookup, following 3 entries are always touched: indexEntry(end),
indexEntry(end-N),
+      and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3
or fewer) are touched, when we
+      touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors
(x86-32, x86-64, MIPS,
+      SPARC, Power, ARM etc.).
+   2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section.
With default Kafka
+      settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log
messages.
+
+   We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to
guarantee all the "warm"
+   section pages are really warm (touched in every lookup) on a typical 4KB-page host.
+
+   In there future, we may use a backend thread to periodically touch the entire warm section.
So that, we can
+   1) support larger warm section
+   2) make sure the warm section of low QPS topic-partitions are really warm.
+ */
+  protected def _warmEntries: Int = 8192 / entrySize
+
   protected val lock = new ReentrantLock
 
   @volatile
@@ -311,26 +369,35 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset:
Lon
     if(_entries == 0)
       return (-1, -1)
 
+    def binarySearch(begin: Int, end: Int) : (Int, Int) = {
+      // binary search for the entry
+      var lo = begin
+      var hi = end
+      while(lo < hi) {
+        val mid = ceil(hi/2.0 + lo/2.0).toInt
+        val found = parseEntry(idx, mid)
+        val compareResult = compareIndexEntry(found, target, searchEntity)
+        if(compareResult > 0)
+          hi = mid - 1
+        else if(compareResult < 0)
+          lo = mid
+        else
+          return (mid, mid)
+      }
+      (lo, if (lo == _entries - 1) -1 else lo + 1)
+    }
+
+    val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
+    // check if the target offset is in the warm section of the index
+    if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
+      return binarySearch(firstHotEntry, _entries - 1)
+    }
+
     // check if the target offset is smaller than the least offset
     if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
       return (-1, 0)
 
-    // binary search for the entry
-    var lo = 0
-    var hi = _entries - 1
-    while(lo < hi) {
-      val mid = ceil(hi/2.0 + lo/2.0).toInt
-      val found = parseEntry(idx, mid)
-      val compareResult = compareIndexEntry(found, target, searchEntity)
-      if(compareResult > 0)
-        hi = mid - 1
-      else if(compareResult < 0)
-        lo = mid
-      else
-        return (mid, mid)
-    }
-
-    (lo, if (lo == _entries - 1) -1 else lo + 1)
+    return binarySearch(0, firstHotEntry)
   }
 
   private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity):
Int = {


Mime
View raw message