kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-918; Change log.retention.hours to be log.retention.mins; patched by Alin Vasile; reviewed by Jun Rao
Date Fri, 25 Oct 2013 15:12:42 GMT
Updated Branches:
  refs/heads/trunk 0e90c246c -> 274b12f33


kafka-918; Change log.retention.hours to be log.retention.mins; patched by Alin Vasile; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/274b12f3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/274b12f3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/274b12f3

Branch: refs/heads/trunk
Commit: 274b12f3351e007f76e8ef64ba9baf5b0824e690
Parents: 0e90c24
Author: Alan Vasile <alinachegalati@yahoo.com>
Authored: Fri Oct 25 08:12:36 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Oct 25 08:12:36 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   | 13 +++-
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../unit/kafka/server/KafkaConfigTest.scala     | 67 ++++++++++++++++++++
 3 files changed, 80 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/274b12f3/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ebd171f..74442b6 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -31,6 +31,17 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
     this(new VerifiableProperties(originalProps))
     props.verify()
   }
+  
+  private def getLogRetentionTimeMillis(): Long = {
+    var millisInMinute = 60L * 1000L
+    val millisInHour = 60L * millisInMinute
+    if(props.containsKey("log.retention.minutes")){
+       millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
+    } else {
+       millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
+    }
+    
+  }
 
   /*********** General Configuration ***********/
   
@@ -92,7 +103,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the number of hours to keep a log file before deleting it */
-  val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
+  val logRetentionTimeMillis = getLogRetentionTimeMillis
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
   val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt
> 0).mapValues(_.toInt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/274b12f3/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c148fdf..5e35a89 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -253,7 +253,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                                      flushInterval = config.logFlushIntervalMessages,
                                      flushMs = config.logFlushIntervalMs.toLong,
                                      retentionSize = config.logRetentionBytes,
-                                     retentionMs = 60L * 60L * 1000L * config.logRetentionHours,
+                                     retentionMs = config.logRetentionTimeMillis,
                                      maxMessageSize = config.messageMaxBytes,
                                      maxIndexSize = config.logIndexSizeMaxBytes,
                                      indexInterval = config.logIndexIntervalBytes,

http://git-wip-us.apache.org/repos/asf/kafka/blob/274b12f3/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
new file mode 100644
index 0000000..2f75e1d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.junit.Test
+import junit.framework.Assert._
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.TestUtils
+
+class KafkaConfigTest extends JUnit3Suite {
+
+  @Test
+  def testLogRetentionTimeHoursProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("log.retention.hours", "1")
+
+    val cfg = new KafkaConfig(props)
+    assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+  }
+  
+  @Test
+  def testLogRetentionTimeMinutesProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("log.retention.minutes", "30")
+
+    val cfg = new KafkaConfig(props)
+    assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+  }
+  
+  @Test
+  def testLogRetentionTimeNoConfigProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+
+    val cfg = new KafkaConfig(props)
+    assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+  }
+  
+  @Test
+  def testLogRetentionTimeBothMinutesAndHoursProvided() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("log.retention.minutes", "30")
+    props.put("log.retention.hours", "1")
+
+    val cfg = new KafkaConfig(props)
+    assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
+
+  }
+
+}
\ No newline at end of file


Mime
View raw message