kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4099; Fix the potential frequent log rolling
Date Fri, 02 Sep 2016 19:49:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8f3462552 -> af9fc503d


KAFKA-4099; Fix the potential frequent log rolling

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #1809 from becketqin/KAFKA-4099


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af9fc503
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af9fc503
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af9fc503

Branch: refs/heads/trunk
Commit: af9fc503dea5058df890fbd79249abb7634e06bc
Parents: 8f34625
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Sep 2 12:49:34 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Sep 2 12:49:34 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 11 ++++++----
 core/src/main/scala/kafka/log/LogSegment.scala  | 21 ++++++++++++--------
 .../src/test/scala/unit/kafka/log/LogTest.scala | 19 ++++++++++++------
 docs/upgrade.html                               |  2 +-
 4 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d343d6f..894beab 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -401,7 +401,8 @@ class Log(val dir: File,
         }
 
         // maybe roll the log if this segment is full
-        val segment = maybeRoll(validMessages.sizeInBytes)
+        val segment = maybeRoll(messagesSize = validMessages.sizeInBytes,
+                                maxTimestampInMessages = appendInfo.maxTimestamp)
 
         // now append to the log
         segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
@@ -736,6 +737,7 @@ class Log(val dir: File,
    * Roll the log over to a new empty log segment if necessary.
    *
    * @param messagesSize The messages set size in bytes
+   * @param maxTimestampInMessages The maximum timestamp in the messages.
    * logSegment will be rolled if one of the following conditions met
    * <ol>
    * <li> The logSegment is full
@@ -745,16 +747,17 @@ class Log(val dir: File,
    * </ol>
    * @return The currently active segment after (perhaps) rolling to a new segment
    */
-  private def maybeRoll(messagesSize: Int): LogSegment = {
+  private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long): LogSegment = {
     val segment = activeSegment
-    val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > config.segmentMs
- segment.rollJitterMs
+    val now = time.milliseconds
+    val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs
- segment.rollJitterMs
     if (segment.size > config.segmentSize - messagesSize ||
         (segment.size > 0 && reachedRollMs) ||
         segment.index.isFull || segment.timeIndex.isFull) {
       debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}},
" +
           s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
           s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries},
" +
-          s"inactive_time_ms = ${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs
- segment.rollJitterMs}).")
+          s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs
- segment.rollJitterMs}).")
       roll()
     } else {
       segment

http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 3d94452..ccc2472 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -339,20 +339,25 @@ class LogSegment(val log: FileMessageSet,
   }
 
   /**
-   * The time this segment has waited to be rolled. If the first message in the segment does
not have a timestamp,
-   * the time is based on the create time of the segment. Otherwise the time is based on
the timestamp of that message.
+   * The time this segment has waited to be rolled.
+   * If the first message has a timestamp we use the message timestamp to determine when
to roll a segment. A segment
+   * is rolled if the difference between the new message's timestamp and the first message's
timestamp exceeds the
+   * segment rolling time.
+   * If the first message does not have a timestamp, we use the wall clock time to determine
when to roll a segment. A
+   * segment is rolled if the difference between the current wall clock time and the segment
create time exceeds the
+   * segment rolling time.
    */
-  def timeWaitedForRoll(now: Long) : Long= {
+  def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
     // Load the timestamp of the first message into memory
-    if (!rollingBasedTimestamp.isDefined) {
+    if (rollingBasedTimestamp.isEmpty) {
       val iter = log.iterator
       if (iter.hasNext)
         rollingBasedTimestamp = Some(iter.next.message.timestamp)
-      else
-        // If the log is empty, we return time elapsed since the segment is created.
-        return now - created
     }
-    now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else created}
+    rollingBasedTimestamp match {
+      case Some(t) if t >= 0 => messageTimestamp - t
+      case _ => now - created
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7f6ef6e..4935aae 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -75,6 +75,7 @@ class LogTest extends JUnitSuite {
                       scheduler = time.scheduler,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
+    // Test the segment rolling behavior when messages do not have a timestamp.
     time.sleep(log.config.segmentMs + 1)
     log.append(set)
     assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
@@ -88,19 +89,25 @@ class LogTest extends JUnitSuite {
       assertEquals("Changing time beyond rollMs and appending should create a new segment.",
numSegments, log.numberOfSegments)
     }
 
-    time.sleep(log.config.segmentMs + 1)
+    // Append a message with timestamp to a segment whose first messgae do not have a timestamp.
     val setWithTimestamp =
       TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds
+ log.config.segmentMs + 1)
     log.append(setWithTimestamp)
-    assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
+    assertEquals("Segment should not have been rolled out because the log rolling should
be based on wall clock.", 4, log.numberOfSegments)
 
+    // Test the segment rolling behavior when messages have timestamps.
     time.sleep(log.config.segmentMs + 1)
-    log.append(set)
-    assertEquals("Log should not roll because the roll should depend on the index of the
first time index entry.", 5, log.numberOfSegments)
+    log.append(setWithTimestamp)
+    assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
 
+    // move the wall clock beyond log rolling time
     time.sleep(log.config.segmentMs + 1)
-    log.append(set)
-    assertEquals("Log should roll because the time since the timestamp of first time index
entry has expired.", 6, log.numberOfSegments)
+    log.append(setWithTimestamp)
+    assertEquals("Log should not roll because the roll should depend on timestamp of the
first message.", 5, log.numberOfSegments)
+
+    val setWithExpiredTimestamp = TestUtils.singleMessageSet(payload = "test".getBytes, timestamp
= time.milliseconds)
+    log.append(setWithExpiredTimestamp)
+    assertEquals("Log should roll because the timestamp in the message should make the log
segment expire.", 6, log.numberOfSegments)
 
     val numSegments = log.numberOfSegments
     time.sleep(log.config.segmentMs + 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/af9fc503/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index eef21cf..d4ba71a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -26,7 +26,7 @@ However, please notice the <a href="#upgrade_10_1_breaking">Potential
breaking c
 <h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking
changes in 0.10.1.0</a></h5>
 <ul>
     <li> The log retention time is no longer based on last modified time of the log
segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li>
-    <li> The log rolling time is no longer depending on log segment create time. Instead
it is now based on the timestamp of the first message in a log segment. i.e. if the timestamp
of the first message in the segment is T, the log will be rolled out at T + log.roll.ms </li>
+    <li> The log rolling time is no longer depending on log segment create time. Instead
it is now based on the timestamp in the messages. More specifically. if the timestamp of the
first message in the segment is T, the log will be rolled out when a new message has a timestamp
greater than or equal to T + log.roll.ms </li>
     <li> The open file handlers of 0.10.0 will increase by ~33% because of the addition
of time index files for each segment.</li>
     <li> The time index and offset index share the same index size configuration. Since
each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes
to avoid potential frequent log rolling. </li>
     <li> Due to the increased number of index files, on some brokers with large amount
the log segments (e.g. >15K), the log loading process during the broker startup could be
longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may
reduce the log loading time. </li>


Mime
View raw message