Repository: kafka
Updated Branches:
refs/heads/0.8.2 4546b9dba -> 75fb44dfd
KAFKA-979 Add jitter for time based rolling; reviewed by Neha Narkhede and Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75fb44df
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75fb44df
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75fb44df
Branch: refs/heads/0.8.2
Commit: 75fb44dfd020384e88a38e426549d86f454a5d1c
Parents: 4546b9d
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Oct 14 21:19:30 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Oct 14 21:19:33 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 8 ++++--
core/src/main/scala/kafka/log/LogCleaner.scala | 2 +-
core/src/main/scala/kafka/log/LogConfig.scala | 12 +++++++++
core/src/main/scala/kafka/log/LogSegment.scala | 6 +++--
.../main/scala/kafka/server/KafkaConfig.scala | 16 +++++++++++-
.../main/scala/kafka/server/KafkaServer.scala | 3 ++-
.../scala/unit/kafka/log/LogSegmentTest.scala | 2 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 26 ++++++++++++++++++++
8 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a123cdc..157d673 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -141,6 +141,7 @@ class Log(val dir: File,
startOffset = start,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
+ rollJitterMs = config.randomSegmentJitter,
time = time)
if(!hasIndex) {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
@@ -156,6 +157,7 @@ class Log(val dir: File,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
+ rollJitterMs = config.randomSegmentJitter,
time = time))
} else {
recoverLog()
@@ -510,7 +512,7 @@ class Log(val dir: File,
private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
if (segment.size > config.segmentSize - messagesSize ||
- segment.size > 0 && time.milliseconds - segment.created > config.segmentMs
||
+ segment.size > 0 && time.milliseconds - segment.created > config.segmentMs
- segment.rollJitterMs ||
segment.index.isFull) {
debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms
= %d/%d)."
.format(name,
@@ -519,7 +521,7 @@ class Log(val dir: File,
segment.index.entries,
segment.index.maxEntries,
time.milliseconds - segment.created,
- config.segmentMs))
+ config.segmentMs - segment.rollJitterMs))
roll()
} else {
segment
@@ -550,6 +552,7 @@ class Log(val dir: File,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
+ rollJitterMs = config.randomSegmentJitter,
time = time)
val prev = addSegment(segment)
if(prev != null)
@@ -642,6 +645,7 @@ class Log(val dir: File,
newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
+ rollJitterMs = config.randomSegmentJitter,
time = time))
updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c20de4a..f8fcb84 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -340,7 +340,7 @@ private[log] class Cleaner(val id: Int,
indexFile.delete()
val messages = new FileMessageSet(logFile)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
- val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes,
time)
+ val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes,
log.config.randomSegmentJitter, time)
try {
// clean segments into the new destination segment
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index d2cc9e3..e48922a 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -18,12 +18,15 @@
package kafka.log
import java.util.Properties
+import org.apache.kafka.common.utils.Utils
+
import scala.collection._
import kafka.common._
object Defaults {
val SegmentSize = 1024 * 1024
val SegmentMs = Long.MaxValue
+ val SegmentJitterMs = 0L
val FlushInterval = Long.MaxValue
val FlushMs = Long.MaxValue
val RetentionSize = Long.MaxValue
@@ -43,6 +46,7 @@ object Defaults {
* Configuration settings for a log
* @param segmentSize The soft maximum for the size of a segment file in the log
* @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
+ * @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering
herds of segment rolling
* @param flushInterval The number of messages that can be written to the log before a flush
is forced
* @param flushMs The amount of time the log can have dirty data before a flush is forced
* @param retentionSize The approximate total number of bytes this log can use
@@ -60,6 +64,7 @@ object Defaults {
*/
case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
val segmentMs: Long = Defaults.SegmentMs,
+ val segmentJitterMs: Long = Defaults.SegmentJitterMs,
val flushInterval: Long = Defaults.FlushInterval,
val flushMs: Long = Defaults.FlushMs,
val retentionSize: Long = Defaults.RetentionSize,
@@ -79,6 +84,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
import LogConfig._
props.put(SegmentBytesProp, segmentSize.toString)
props.put(SegmentMsProp, segmentMs.toString)
+ props.put(SegmentJitterMsProp, segmentJitterMs.toString)
props.put(SegmentIndexBytesProp, maxIndexSize.toString)
props.put(FlushMessagesProp, flushInterval.toString)
props.put(FlushMsProp, flushMs.toString)
@@ -94,11 +100,15 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
props
}
+
+ def randomSegmentJitter: Long =
+ if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs,
segmentMs)
}
object LogConfig {
val SegmentBytesProp = "segment.bytes"
val SegmentMsProp = "segment.ms"
+ val SegmentJitterMsProp = "segment.jitter.ms"
val SegmentIndexBytesProp = "segment.index.bytes"
val FlushMessagesProp = "flush.messages"
val FlushMsProp = "flush.ms"
@@ -115,6 +125,7 @@ object LogConfig {
val ConfigNames = Set(SegmentBytesProp,
SegmentMsProp,
+ SegmentJitterMsProp,
SegmentIndexBytesProp,
FlushMessagesProp,
FlushMsProp,
@@ -135,6 +146,7 @@ object LogConfig {
def fromProps(props: Properties): LogConfig = {
new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt,
segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong,
+ segmentJitterMs = props.getProperty(SegmentJitterMsProp, Defaults.SegmentJitterMs.toString).toLong,
maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt,
flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong,
flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong,
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 7597d30..ac96434 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -44,18 +44,20 @@ class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
+ val rollJitterMs: Long,
time: Time) extends Logging {
var created = time.milliseconds
-
+
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
- def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, time:
Time) =
+ def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs:
Long, time: Time) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset)),
new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset,
maxIndexSize = maxIndexSize),
startOffset,
indexIntervalBytes,
+ rollJitterMs,
time)
/* Return the size in bytes of this log segment */
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7fcbc16..6e26c54 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -57,7 +57,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
}
}
-
+
+ private def getLogRollTimeJitterMillis(): Long = {
+ val millisInHour = 60L * 60L * 1000L
+
+ if(props.containsKey("log.roll.jitter.ms")) {
+ props.getIntInRange("log.roll.jitter.ms", (0, Int.MaxValue))
+ }
+ else {
+ millisInHour * props.getIntInRange("log.roll.jitter.hours", 0, (0, Int.MaxValue))
+ }
+ }
+
/*********** General Configuration ***********/
/* the broker id for this server */
@@ -131,6 +142,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the maximum time before a new log segment is rolled out */
val logRollTimeMillis = getLogRollTimeMillis
+ /* the maximum jitter to subtract from logRollTimeMillis */
+ val logRollTimeJitterMillis = getLogRollTimeJitterMillis
+
/* the number of hours to keep a log file before deleting it */
val logRetentionTimeMillis = getLogRetentionTimeMillis
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3e9e91f..07c0a07 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -309,8 +309,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
def getLogManager(): LogManager = logManager
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager
= {
- val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
+ val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = config.logRollTimeMillis,
+ segmentJitterMs = config.logRollTimeJitterMillis,
flushInterval = config.logFlushIntervalMessages,
flushMs = config.logFlushIntervalMs.toLong,
retentionSize = config.logRetentionBytes,
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 7b97e6a..03fb351 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -39,7 +39,7 @@ class LogSegmentTest extends JUnit3Suite {
val idxFile = TestUtils.tempFile()
idxFile.delete()
val idx = new OffsetIndex(idxFile, offset, 1000)
- val seg = new LogSegment(ms, idx, offset, 10, SystemTime)
+ val seg = new LogSegment(ms, idx, offset, 10, 0, SystemTime)
segments += seg
seg
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/75fb44df/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index a0cbd3b..d670ba7 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -88,6 +88,32 @@ class LogTest extends JUnitSuite {
}
/**
+ * Test for jitter s for time based log roll. This test appends messages then changes the
time
+ * using the mock clock to force the log to roll and checks the number of segments.
+ */
+ @Test
+ def testTimeBasedLogRollJitter() {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ val maxJitter = 20 * 60L
+
+ // create a log
+ val log = new Log(logDir,
+ logConfig.copy(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter),
+ recoveryPoint = 0L,
+ scheduler = time.scheduler,
+ time = time)
+ assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
+ log.append(set)
+
+ time.sleep(log.config.segmentMs - maxJitter)
+ log.append(set)
+ assertEquals("Log does not roll on this append because it occurs earlier than max jitter",
1, log.numberOfSegments);
+ time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
+ log.append(set)
+ assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
+ }
+
+ /**
* Test that appending more than the maximum segment size rolls the log
*/
@Test
|