kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-979 Add jitter for time based rolling; reviewed by Neha Narkhede and Joel Koshy
Date Wed, 15 Oct 2014 04:19:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 4546b9dba -> 75fb44dfd


KAFKA-979 Add jitter for time based rolling; reviewed by Neha Narkhede and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75fb44df
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75fb44df
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75fb44df

Branch: refs/heads/0.8.2
Commit: 75fb44dfd020384e88a38e426549d86f454a5d1c
Parents: 4546b9d
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Oct 14 21:19:30 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Oct 14 21:19:33 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |  8 ++++--
 core/src/main/scala/kafka/log/LogCleaner.scala  |  2 +-
 core/src/main/scala/kafka/log/LogConfig.scala   | 12 +++++++++
 core/src/main/scala/kafka/log/LogSegment.scala  |  6 +++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 16 +++++++++++-
 .../main/scala/kafka/server/KafkaServer.scala   |  3 ++-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 26 ++++++++++++++++++++
 8 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a123cdc..157d673 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -141,6 +141,7 @@ class Log(val dir: File,
                                      startOffset = start,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
+                                     rollJitterMs = config.randomSegmentJitter,
                                      time = time)
         if(!hasIndex) {
           error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
@@ -156,6 +157,7 @@ class Log(val dir: File,
                                      startOffset = 0,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
+                                     rollJitterMs = config.randomSegmentJitter,
                                      time = time))
     } else {
       recoverLog()
@@ -510,7 +512,7 @@ class Log(val dir: File,
   private def maybeRoll(messagesSize: Int): LogSegment = {
     val segment = activeSegment
     if (segment.size > config.segmentSize - messagesSize ||
-        segment.size > 0 && time.milliseconds - segment.created > config.segmentMs
||
+        segment.size > 0 && time.milliseconds - segment.created > config.segmentMs
- segment.rollJitterMs ||
         segment.index.isFull) {
       debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms
= %d/%d)."
             .format(name,
@@ -519,7 +521,7 @@ class Log(val dir: File,
                     segment.index.entries,
                     segment.index.maxEntries,
                     time.milliseconds - segment.created,
-                    config.segmentMs))
+                    config.segmentMs - segment.rollJitterMs))
       roll()
     } else {
       segment
@@ -550,6 +552,7 @@ class Log(val dir: File,
                                    startOffset = newOffset,
                                    indexIntervalBytes = config.indexInterval, 
                                    maxIndexSize = config.maxIndexSize,
+                                   rollJitterMs = config.randomSegmentJitter,
                                    time = time)
       val prev = addSegment(segment)
       if(prev != null)
@@ -642,6 +645,7 @@ class Log(val dir: File,
                                 newOffset,
                                 indexIntervalBytes = config.indexInterval, 
                                 maxIndexSize = config.maxIndexSize,
+                                rollJitterMs = config.randomSegmentJitter,
                                 time = time))
       updateLogEndOffset(newOffset)
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)

http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c20de4a..f8fcb84 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -340,7 +340,7 @@ private[log] class Cleaner(val id: Int,
     indexFile.delete()
     val messages = new FileMessageSet(logFile)
     val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
-    val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes,
time)
+    val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes,
log.config.randomSegmentJitter, time)
 
     try {
       // clean segments into the new destination segment

http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index d2cc9e3..e48922a 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -18,12 +18,15 @@
 package kafka.log
 
 import java.util.Properties
+import org.apache.kafka.common.utils.Utils
+
 import scala.collection._
 import kafka.common._
 
 object Defaults {
   val SegmentSize = 1024 * 1024
   val SegmentMs = Long.MaxValue
+  val SegmentJitterMs = 0L
   val FlushInterval = Long.MaxValue
   val FlushMs = Long.MaxValue
   val RetentionSize = Long.MaxValue
@@ -43,6 +46,7 @@ object Defaults {
  * Configuration settings for a log
  * @param segmentSize The soft maximum for the size of a segment file in the log
  * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
+ * @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering
herds of segment rolling
  * @param flushInterval The number of messages that can be written to the log before a flush
is forced
  * @param flushMs The amount of time the log can have dirty data before a flush is forced
  * @param retentionSize The approximate total number of bytes this log can use
@@ -60,6 +64,7 @@ object Defaults {
  */
 case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
                      val segmentMs: Long = Defaults.SegmentMs,
+                     val segmentJitterMs: Long = Defaults.SegmentJitterMs,
                      val flushInterval: Long = Defaults.FlushInterval,
                      val flushMs: Long = Defaults.FlushMs,
                      val retentionSize: Long = Defaults.RetentionSize,
@@ -79,6 +84,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
     import LogConfig._
     props.put(SegmentBytesProp, segmentSize.toString)
     props.put(SegmentMsProp, segmentMs.toString)
+    props.put(SegmentJitterMsProp, segmentJitterMs.toString)
     props.put(SegmentIndexBytesProp, maxIndexSize.toString)
     props.put(FlushMessagesProp, flushInterval.toString)
     props.put(FlushMsProp, flushMs.toString)
@@ -94,11 +100,15 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
     props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
     props
   }
+
+  def randomSegmentJitter: Long =
+    if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs,
segmentMs)
 }
 
 object LogConfig {
   val SegmentBytesProp = "segment.bytes"
   val SegmentMsProp = "segment.ms"
+  val SegmentJitterMsProp = "segment.jitter.ms"
   val SegmentIndexBytesProp = "segment.index.bytes"
   val FlushMessagesProp = "flush.messages"
   val FlushMsProp = "flush.ms"
@@ -115,6 +125,7 @@ object LogConfig {
 
   val ConfigNames = Set(SegmentBytesProp,
                         SegmentMsProp,
+                        SegmentJitterMsProp,
                         SegmentIndexBytesProp,
                         FlushMessagesProp,
                         FlushMsProp,
@@ -135,6 +146,7 @@ object LogConfig {
   def fromProps(props: Properties): LogConfig = {
     new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt,
                   segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong,
+                  segmentJitterMs = props.getProperty(SegmentJitterMsProp, Defaults.SegmentJitterMs.toString).toLong,
                   maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt,
                   flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong,
                   flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong,

http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 7597d30..ac96434 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -44,18 +44,20 @@ class LogSegment(val log: FileMessageSet,
                  val index: OffsetIndex, 
                  val baseOffset: Long, 
                  val indexIntervalBytes: Int,
+                 val rollJitterMs: Long,
                  time: Time) extends Logging {
   
   var created = time.milliseconds
-  
+
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
   
-  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, time:
Time) = 
+  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs:
Long, time: Time) =
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
          new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset,
maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
+         rollJitterMs,
          time)
     
   /* Return the size in bytes of this log segment */

http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7fcbc16..6e26c54 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -57,7 +57,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
        millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
     }
   }
-  
+
+  private def getLogRollTimeJitterMillis(): Long = {
+    val millisInHour = 60L * 60L * 1000L
+
+    if(props.containsKey("log.roll.jitter.ms")) {
+      props.getIntInRange("log.roll.jitter.ms", (0, Int.MaxValue))
+    }
+    else {
+      millisInHour * props.getIntInRange("log.roll.jitter.hours", 0, (0, Int.MaxValue))
+    }
+  }
+
   /*********** General Configuration ***********/
 
   /* the broker id for this server */
@@ -131,6 +142,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the maximum time before a new log segment is rolled out */
   val logRollTimeMillis = getLogRollTimeMillis
 
+  /* the maximum jitter to subtract from logRollTimeMillis */
+  val logRollTimeJitterMillis = getLogRollTimeJitterMillis
+
   /* the number of hours to keep a log file before deleting it */
   val logRetentionTimeMillis = getLogRetentionTimeMillis
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3e9e91f..07c0a07 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -309,8 +309,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
   def getLogManager(): LogManager = logManager
   
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager
= {
-    val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, 
+    val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
                                      segmentMs = config.logRollTimeMillis,
+                                     segmentJitterMs = config.logRollTimeJitterMillis,
                                      flushInterval = config.logFlushIntervalMessages,
                                      flushMs = config.logFlushIntervalMs.toLong,
                                      retentionSize = config.logRetentionBytes,

http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 7b97e6a..03fb351 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -39,7 +39,7 @@ class LogSegmentTest extends JUnit3Suite {
     val idxFile = TestUtils.tempFile()
     idxFile.delete()
     val idx = new OffsetIndex(idxFile, offset, 1000)
-    val seg = new LogSegment(ms, idx, offset, 10, SystemTime)
+    val seg = new LogSegment(ms, idx, offset, 10, 0, SystemTime)
     segments += seg
     seg
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a0cbd3b..d670ba7 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -88,6 +88,32 @@ class LogTest extends JUnitSuite {
   }
 
   /**
+   * Test for jitter s for time based log roll. This test appends messages then changes the
time
+   * using the mock clock to force the log to roll and checks the number of segments.
+   */
+  @Test
+  def testTimeBasedLogRollJitter() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val maxJitter = 20 * 60L
+
+    // create a log
+    val log = new Log(logDir,
+      logConfig.copy(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+    assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
+    log.append(set)
+
+    time.sleep(log.config.segmentMs - maxJitter)
+    log.append(set)
+    assertEquals("Log does not roll on this append because it occurs earlier than max jitter",
1, log.numberOfSegments);
+    time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
+    log.append(set)
+    assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
+  }
+
+  /**
    * Test that appending more than the maximum segment size rolls the log
    */
   @Test


Mime
View raw message