Repository: kafka
Updated Branches:
refs/heads/trunk 761711eca -> 3139cc31f
kafka-1990; Add unlimited time-based log retention; patched by Jeff Holoman; reviewed by Jun
Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3139cc31
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3139cc31
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3139cc31
Branch: refs/heads/trunk
Commit: 3139cc31f6cab26f3dfd429c36280d1567dbf61a
Parents: 761711e
Author: Jeff Holoman <jeff.holoman@gmail.com>
Authored: Sun Apr 19 11:46:18 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sun Apr 19 11:46:18 2015 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogConfig.scala | 11 +--
core/src/main/scala/kafka/log/LogManager.scala | 2 +
.../main/scala/kafka/server/KafkaConfig.scala | 24 +++--
.../test/scala/kafka/log/LogConfigTest.scala | 95 -------------------
.../test/scala/unit/kafka/admin/AdminTest.scala | 2 +-
.../scala/unit/kafka/log/LogConfigTest.scala | 99 ++++++++++++++++++++
.../unit/kafka/server/KafkaConfigTest.scala | 53 +++++++++++
7 files changed, 175 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 558c703..da55a34 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,8 +21,6 @@ import java.util.Properties
import org.apache.kafka.common.utils.Utils
import scala.collection._
import org.apache.kafka.common.config.ConfigDef
-import kafka.common._
-import scala.collection.JavaConversions._
import kafka.message.BrokerCompressionCodec
object Defaults {
@@ -93,7 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
props.put(FlushMessagesProp, flushInterval.toString)
props.put(FlushMsProp, flushMs.toString)
props.put(RetentionBytesProp, retentionSize.toString)
- props.put(RententionMsProp, retentionMs.toString)
+ props.put(RetentionMsProp, retentionMs.toString)
props.put(MaxMessageBytesProp, maxMessageSize.toString)
props.put(IndexIntervalBytesProp, indexInterval.toString)
props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
@@ -122,7 +120,7 @@ object LogConfig {
val FlushMessagesProp = "flush.messages"
val FlushMsProp = "flush.ms"
val RetentionBytesProp = "retention.bytes"
- val RententionMsProp = "retention.ms"
+ val RetentionMsProp = "retention.ms"
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
val DeleteRetentionMsProp = "delete.retention.ms"
@@ -172,7 +170,8 @@ object LogConfig {
.define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc)
// can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize
.define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc)
- .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc)
+ // can be negative. See kafka.log.LogManager.cleanupExpiredSegments
+ .define(RetentionMsProp, LONG, Defaults.RetentionMs, MEDIUM, RetentionMsDoc)
.define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc)
.define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM, IndexIntervalDoc)
.define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM,
DeleteRetentionMsDoc)
@@ -206,7 +205,7 @@ object LogConfig {
flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
- retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long],
+ retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long],
maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],
http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a7a9b85..e781eba 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -407,6 +407,8 @@ class LogManager(val logDirs: Array[File],
* Runs through the log removing segments older than a certain age
*/
private def cleanupExpiredSegments(log: Log): Int = {
+ if (log.config.retentionMs < 0)
+ return 0
val startMs = time.milliseconds
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 69b772c..cfbbd2b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -417,7 +417,7 @@ object KafkaConfig {
.define(LogRetentionTimeMillisProp, LONG, HIGH, LogRetentionTimeMillisDoc, false)
.define(LogRetentionTimeMinutesProp, INT, HIGH, LogRetentionTimeMinsDoc, false)
- .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, atLeast(1), HIGH,
LogRetentionTimeHoursDoc)
+ .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, HIGH, LogRetentionTimeHoursDoc)
.define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc)
.define(LogCleanupIntervalMsProp, LONG, Defaults.LogCleanupIntervalMs, atLeast(1),
MEDIUM, LogCleanupIntervalMsDoc)
@@ -770,12 +770,16 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
- _logRetentionTimeMillis.getOrElse(
- _logRetentionTimeMins match {
- case Some(mins) => millisInMinute * mins
- case None => millisInHour * logRetentionTimeHours
- }
- )
+ val millis = {
+ _logRetentionTimeMillis.getOrElse(
+ _logRetentionTimeMins match {
+ case Some(mins) => millisInMinute * mins
+ case None => millisInHour * logRetentionTimeHours
+ }
+ )
+ }
+ if (millis < 0) return -1
+ millis
}
private def getMap(propName: String, propValue: String): Map[String, String] = {
@@ -834,8 +838,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must
be equal or greater than -1 and not greater than reserved.broker.max.id")
require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater
than 0")
- require(logRetentionTimeMillis >= 1, "log.retention.ms must be equal or greater than
1")
- require(_logRetentionTimeMins.forall(_ >= 1), "log.retention.minutes must be equal
or greater than 1")
+
+ require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)),
"log.retention.minutes must be unlimited (-1) or, equal or greater than 1")
+ require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours
must be unlimited (-1) or, equal or greater than 1")
+ require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms
must be unlimited (-1) or, equal or greater than 1")
require(logDirs.size > 0)
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size
must be at least 1MB per cleaner thread.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala
deleted file mode 100644
index 9690f14..0000000
--- a/core/src/test/scala/kafka/log/LogConfigTest.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import org.apache.kafka.common.config.ConfigException
-import org.scalatest.junit.JUnit3Suite
-import org.junit.{Assert, Test}
-import java.util.Properties
-
-class LogConfigTest extends JUnit3Suite {
-
- @Test
- def testFromPropsDefaults() {
- val defaults = new Properties()
- defaults.put(LogConfig.SegmentBytesProp, "4242")
- val props = new Properties(defaults)
-
- val config = LogConfig.fromProps(props)
-
- Assert.assertEquals(4242, config.segmentSize)
- Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize,
config.maxMessageSize)
- Assert.assertEquals("producer", config.compressionType)
- }
-
- @Test
- def testFromPropsEmpty() {
- val p = new Properties()
- val config = LogConfig.fromProps(p)
- Assert.assertEquals(LogConfig(), config)
- }
-
- @Test
- def testFromPropsToProps() {
- import scala.util.Random._
- val expected = new Properties()
- LogConfig.configNames().foreach((name) => {
- name match {
- case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true",
"false"))
- case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer",
"uncompressed", "gzip"))
- case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact,
LogConfig.Delete))
- case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble
* .9 + .1))
- case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue
- 1) + 1).toString)
- case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
- case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
- }
- })
-
- val actual = LogConfig.fromProps(expected).toProps
- Assert.assertEquals(expected, actual)
- }
-
- @Test
- def testFromPropsInvalid() {
- LogConfig.configNames().foreach((name) => {
- name match {
- case LogConfig.UncleanLeaderElectionEnableProp => return
- case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
- case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
- case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number",
"-0.1", "1.2")
- case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number",
"0", "-1")
- case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
- }
- })
- }
-
- private def assertPropertyInvalid(name: String, values: AnyRef*) {
- values.foreach((value) => {
- val props = new Properties
- props.setProperty(name, value.toString)
- intercept[ConfigException] {
- LogConfig.fromProps(props)
- }
- })
- }
-
- private def randFrom[T](choices: T*): T = {
- import scala.util.Random
- choices(Random.nextInt(choices.size))
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index cfe38df..4b728a1 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -370,7 +370,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
def makeConfig(messageSize: Int, retentionMs: Long) = {
var props = new Properties()
props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
- props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
+ props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
props
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
new file mode 100644
index 0000000..f3546ad
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.log
+
+import java.util.Properties
+
+import kafka.log.{Defaults, LogConfig}
+import org.apache.kafka.common.config.ConfigException
+import org.junit.{Assert, Test}
+import org.scalatest.junit.JUnit3Suite
+
+class LogConfigTest extends JUnit3Suite {
+
+ @Test
+ def testFromPropsDefaults() {
+ val defaults = new Properties()
+ defaults.put(LogConfig.SegmentBytesProp, "4242")
+ val props = new Properties(defaults)
+
+ val config = LogConfig.fromProps(props)
+
+ Assert.assertEquals(4242, config.segmentSize)
+ Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize,
config.maxMessageSize)
+ Assert.assertEquals("producer", config.compressionType)
+ }
+
+ @Test
+ def testFromPropsEmpty() {
+ val p = new Properties()
+ val config = LogConfig.fromProps(p)
+ Assert.assertEquals(LogConfig(), config)
+ }
+
+ @Test
+ def testFromPropsToProps() {
+ import scala.util.Random._
+ val expected = new Properties()
+ LogConfig.configNames().foreach((name) => {
+ name match {
+ case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true",
"false"))
+ case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer",
"uncompressed", "gzip"))
+ case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact,
LogConfig.Delete))
+ case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble
* .9 + .1))
+ case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue
- 1) + 1).toString)
+ case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
+ case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
+ case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
+ }
+ })
+
+ val actual = LogConfig.fromProps(expected).toProps
+ Assert.assertEquals(expected, actual)
+ }
+
+ @Test
+ def testFromPropsInvalid() {
+ LogConfig.configNames().foreach((name) => {
+ name match {
+ case LogConfig.UncleanLeaderElectionEnableProp => return
+ case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
+ case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" )
+ case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
+ case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number",
"-0.1", "1.2")
+ case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number",
"0", "-1")
+ case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+ }
+ })
+ }
+
+ private def assertPropertyInvalid(name: String, values: AnyRef*) {
+ values.foreach((value) => {
+ val props = new Properties
+ props.setProperty(name, value.toString)
+ intercept[ConfigException] {
+ LogConfig.fromProps(props)
+ }
+ })
+ }
+
+ private def randFrom[T](choices: T*): T = {
+ import scala.util.Random
+ choices(Random.nextInt(choices.size))
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 40c265a..2428dbd 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -83,6 +83,59 @@ class KafkaConfigTest extends JUnit3Suite {
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
+ @Test
+ def testLogRetentionUnlimited() {
+ val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+ val props2 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+ val props3 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+ val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+ val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+
+ props1.put("log.retention.ms", "-1")
+ props2.put("log.retention.minutes", "-1")
+ props3.put("log.retention.hours", "-1")
+
+ val cfg1 = KafkaConfig.fromProps(props1)
+ val cfg2 = KafkaConfig.fromProps(props2)
+ val cfg3 = KafkaConfig.fromProps(props3)
+ assertEquals("Should be -1", -1, cfg1.logRetentionTimeMillis)
+ assertEquals("Should be -1", -1, cfg2.logRetentionTimeMillis)
+ assertEquals("Should be -1", -1, cfg3.logRetentionTimeMillis)
+
+ props4.put("log.retention.ms", "-1")
+ props4.put("log.retention.minutes", "30")
+
+ val cfg4 = KafkaConfig.fromProps(props4)
+ assertEquals("Should be -1", -1, cfg4.logRetentionTimeMillis)
+
+ props5.put("log.retention.ms", "0")
+
+ intercept[IllegalArgumentException] {
+ val cfg5 = KafkaConfig.fromProps(props5)
+ }
+ }
+
+ @Test
+ def testLogRetentionValid {
+ val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+
+ props1.put("log.retention.ms", "0")
+ props2.put("log.retention.minutes", "0")
+ props3.put("log.retention.hours", "0")
+
+ intercept[IllegalArgumentException] {
+ val cfg1 = KafkaConfig.fromProps(props1)
+ }
+ intercept[IllegalArgumentException] {
+ val cfg2 = KafkaConfig.fromProps(props2)
+ }
+ intercept[IllegalArgumentException] {
+ val cfg3 = KafkaConfig.fromProps(props3)
+ }
+
+ }
@Test
def testAdvertiseDefaults() {
|