Updated Branches:
refs/heads/trunk 0e90c246c -> 274b12f33
kafka-918; Change log.retention.hours to be log.retention.mins; patched by Alin Vasile; reviewed
by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/274b12f3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/274b12f3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/274b12f3
Branch: refs/heads/trunk
Commit: 274b12f3351e007f76e8ef64ba9baf5b0824e690
Parents: 0e90c24
Author: Alan Vasile <alinachegalati@yahoo.com>
Authored: Fri Oct 25 08:12:36 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Oct 25 08:12:36 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/server/KafkaConfig.scala | 13 +++-
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../unit/kafka/server/KafkaConfigTest.scala | 67 ++++++++++++++++++++
3 files changed, 80 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/274b12f3/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ebd171f..74442b6 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -31,6 +31,17 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
this(new VerifiableProperties(originalProps))
props.verify()
}
+
+ private def getLogRetentionTimeMillis(): Long = {
+ var millisInMinute = 60L * 1000L
+ val millisInHour = 60L * millisInMinute
+ if(props.containsKey("log.retention.minutes")){
+ millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
+ } else {
+ millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
+ }
+
+ }
/*********** General Configuration ***********/
@@ -92,7 +103,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the number of hours to keep a log file before deleting it */
- val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
+ val logRetentionTimeMillis = getLogRetentionTimeMillis
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt
> 0).mapValues(_.toInt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/274b12f3/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c148fdf..5e35a89 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -253,7 +253,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
flushInterval = config.logFlushIntervalMessages,
flushMs = config.logFlushIntervalMs.toLong,
retentionSize = config.logRetentionBytes,
- retentionMs = 60L * 60L * 1000L * config.logRetentionHours,
+ retentionMs = config.logRetentionTimeMillis,
maxMessageSize = config.messageMaxBytes,
maxIndexSize = config.logIndexSizeMaxBytes,
indexInterval = config.logIndexIntervalBytes,
http://git-wip-us.apache.org/repos/asf/kafka/blob/274b12f3/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
new file mode 100644
index 0000000..2f75e1d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.junit.Test
+import junit.framework.Assert._
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.TestUtils
+
+class KafkaConfigTest extends JUnit3Suite {
+
+ @Test
+ def testLogRetentionTimeHoursProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+ props.put("log.retention.hours", "1")
+
+ val cfg = new KafkaConfig(props)
+ assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+ }
+
+ @Test
+ def testLogRetentionTimeMinutesProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+ props.put("log.retention.minutes", "30")
+
+ val cfg = new KafkaConfig(props)
+ assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+ }
+
+ @Test
+ def testLogRetentionTimeNoConfigProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+
+ val cfg = new KafkaConfig(props)
+ assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+ }
+
+ @Test
+ def testLogRetentionTimeBothMinutesAndHoursProvided() {
+ val props = TestUtils.createBrokerConfig(0, 8181)
+ props.put("log.retention.minutes", "30")
+ props.put("log.retention.hours", "1")
+
+ val cfg = new KafkaConfig(props)
+ assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+ }
+
+}
\ No newline at end of file
|