kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1990; Add unlimited time-based log retention; patched by Jeff Holoman; reviewed by Jun Rao
Date Sun, 19 Apr 2015 18:46:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 761711eca -> 3139cc31f


kafka-1990; Add unlimited time-based log retention; patched by Jeff Holoman; reviewed by Jun
Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3139cc31
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3139cc31
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3139cc31

Branch: refs/heads/trunk
Commit: 3139cc31f6cab26f3dfd429c36280d1567dbf61a
Parents: 761711e
Author: Jeff Holoman <jeff.holoman@gmail.com>
Authored: Sun Apr 19 11:46:18 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sun Apr 19 11:46:18 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogConfig.scala   | 11 +--
 core/src/main/scala/kafka/log/LogManager.scala  |  2 +
 .../main/scala/kafka/server/KafkaConfig.scala   | 24 +++--
 .../test/scala/kafka/log/LogConfigTest.scala    | 95 -------------------
 .../test/scala/unit/kafka/admin/AdminTest.scala |  2 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    | 99 ++++++++++++++++++++
 .../unit/kafka/server/KafkaConfigTest.scala     | 53 +++++++++++
 7 files changed, 175 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 558c703..da55a34 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,8 +21,6 @@ import java.util.Properties
 import org.apache.kafka.common.utils.Utils
 import scala.collection._
 import org.apache.kafka.common.config.ConfigDef
-import kafka.common._
-import scala.collection.JavaConversions._
 import kafka.message.BrokerCompressionCodec
 
 object Defaults {
@@ -93,7 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
     props.put(FlushMessagesProp, flushInterval.toString)
     props.put(FlushMsProp, flushMs.toString)
     props.put(RetentionBytesProp, retentionSize.toString)
-    props.put(RententionMsProp, retentionMs.toString)
+    props.put(RetentionMsProp, retentionMs.toString)
     props.put(MaxMessageBytesProp, maxMessageSize.toString)
     props.put(IndexIntervalBytesProp, indexInterval.toString)
     props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
@@ -122,7 +120,7 @@ object LogConfig {
   val FlushMessagesProp = "flush.messages"
   val FlushMsProp = "flush.ms"
   val RetentionBytesProp = "retention.bytes"
-  val RententionMsProp = "retention.ms"
+  val RetentionMsProp = "retention.ms"
   val MaxMessageBytesProp = "max.message.bytes"
   val IndexIntervalBytesProp = "index.interval.bytes"
   val DeleteRetentionMsProp = "delete.retention.ms"
@@ -172,7 +170,8 @@ object LogConfig {
       .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc)
       // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize
       .define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc)
-      .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc)
+      // can be negative. See kafka.log.LogManager.cleanupExpiredSegments
+      .define(RetentionMsProp, LONG, Defaults.RetentionMs, MEDIUM, RetentionMsDoc)
       .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc)
       .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM,  IndexIntervalDoc)
       .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM,
DeleteRetentionMsDoc)
@@ -206,7 +205,7 @@ object LogConfig {
                   flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
                   flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
                   retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
-                  retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long],
+                  retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long],
                   maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
                   indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
                   fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],

http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a7a9b85..e781eba 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -407,6 +407,8 @@ class LogManager(val logDirs: Array[File],
    * Runs through the log removing segments older than a certain age
    */
   private def cleanupExpiredSegments(log: Log): Int = {
+    if (log.config.retentionMs < 0)
+      return 0
     val startMs = time.milliseconds
     log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 69b772c..cfbbd2b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -417,7 +417,7 @@ object KafkaConfig {
 
       .define(LogRetentionTimeMillisProp, LONG, HIGH, LogRetentionTimeMillisDoc, false)
       .define(LogRetentionTimeMinutesProp, INT, HIGH, LogRetentionTimeMinsDoc, false)
-      .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, atLeast(1), HIGH,
LogRetentionTimeHoursDoc)
+      .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, HIGH, LogRetentionTimeHoursDoc)
 
       .define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc)
       .define(LogCleanupIntervalMsProp, LONG, Defaults.LogCleanupIntervalMs, atLeast(1),
MEDIUM, LogCleanupIntervalMsDoc)
@@ -770,12 +770,16 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
 
-    _logRetentionTimeMillis.getOrElse(
-      _logRetentionTimeMins match {
-        case Some(mins) => millisInMinute * mins
-        case None => millisInHour * logRetentionTimeHours
-      }
-    )
+    val millis = {
+      _logRetentionTimeMillis.getOrElse(
+        _logRetentionTimeMins match {
+          case Some(mins) => millisInMinute * mins
+          case None => millisInHour * logRetentionTimeHours
+        }
+      )
+    }
+    if (millis < 0) return -1
+    millis
   }
 
   private def getMap(propName: String, propValue: String): Map[String, String] = {
@@ -834,8 +838,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
     require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must
be equal or greater than -1 and not greater than reserved.broker.max.id")
     require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater
than 0")
-    require(logRetentionTimeMillis >= 1, "log.retention.ms must be equal or greater than
1")
-    require(_logRetentionTimeMins.forall(_ >= 1), "log.retention.minutes must be equal
or greater than 1")
+
+    require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)),
"log.retention.minutes must be unlimited (-1) or, equal or greater than 1")
+    require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours
must be unlimited (-1) or, equal or greater than 1")
+    require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms
must be unlimited (-1) or, equal or greater than 1")
 
     require(logDirs.size > 0)
     require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size
must be at least 1MB per cleaner thread.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala
deleted file mode 100644
index 9690f14..0000000
--- a/core/src/test/scala/kafka/log/LogConfigTest.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import org.apache.kafka.common.config.ConfigException
-import org.scalatest.junit.JUnit3Suite
-import org.junit.{Assert, Test}
-import java.util.Properties
-
-class LogConfigTest extends JUnit3Suite {
-
-  @Test
-  def testFromPropsDefaults() {
-    val defaults = new Properties()
-    defaults.put(LogConfig.SegmentBytesProp, "4242")
-    val props = new Properties(defaults)
-
-    val config = LogConfig.fromProps(props)
-
-    Assert.assertEquals(4242, config.segmentSize)
-    Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize,
config.maxMessageSize)
-    Assert.assertEquals("producer", config.compressionType)
-  }
-
-  @Test
-  def testFromPropsEmpty() {
-    val p = new Properties()
-    val config = LogConfig.fromProps(p)
-    Assert.assertEquals(LogConfig(), config)
-  }
-
-  @Test
-  def testFromPropsToProps() {
-    import scala.util.Random._
-    val expected = new Properties()
-    LogConfig.configNames().foreach((name) => {
-      name match {
-        case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true",
"false"))
-        case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer",
"uncompressed", "gzip"))
-        case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact,
LogConfig.Delete))
-        case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble
* .9 + .1))
-        case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue
- 1) + 1).toString)
-        case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
-        case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
-      }
-    })
-
-    val actual = LogConfig.fromProps(expected).toProps
-    Assert.assertEquals(expected, actual)
-  }
-
-  @Test
-  def testFromPropsInvalid() {
-    LogConfig.configNames().foreach((name) => {
-      name match {
-        case LogConfig.UncleanLeaderElectionEnableProp  => return
-        case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
-        case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
-        case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number",
"-0.1", "1.2")
-        case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number",
"0", "-1")
-        case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
-      }
-    })
-   }
-
-  private def assertPropertyInvalid(name: String, values: AnyRef*) {
-    values.foreach((value) => {
-      val props = new Properties
-      props.setProperty(name, value.toString)
-      intercept[ConfigException] {
-        LogConfig.fromProps(props)
-      }
-    })
-  }
-
-  private def randFrom[T](choices: T*): T = {
-    import scala.util.Random
-    choices(Random.nextInt(choices.size))
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index cfe38df..4b728a1 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -370,7 +370,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     def makeConfig(messageSize: Int, retentionMs: Long) = {
       var props = new Properties()
       props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
-      props.setProperty(LogConfig.RententionMsProp, retentionMs.toString)
+      props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
       props
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
new file mode 100644
index 0000000..f3546ad
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.log
+
+import java.util.Properties
+
+import kafka.log.{Defaults, LogConfig}
+import org.apache.kafka.common.config.ConfigException
+import org.junit.{Assert, Test}
+import org.scalatest.junit.JUnit3Suite
+
+class LogConfigTest extends JUnit3Suite {
+
+  @Test
+  def testFromPropsDefaults() {
+    val defaults = new Properties()
+    defaults.put(LogConfig.SegmentBytesProp, "4242")
+    val props = new Properties(defaults)
+
+    val config = LogConfig.fromProps(props)
+
+    Assert.assertEquals(4242, config.segmentSize)
+    Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize,
config.maxMessageSize)
+    Assert.assertEquals("producer", config.compressionType)
+  }
+
+  @Test
+  def testFromPropsEmpty() {
+    val p = new Properties()
+    val config = LogConfig.fromProps(p)
+    Assert.assertEquals(LogConfig(), config)
+  }
+
+  @Test
+  def testFromPropsToProps() {
+    import scala.util.Random._
+    val expected = new Properties()
+    LogConfig.configNames().foreach((name) => {
+      name match {
+        case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true",
"false"))
+        case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer",
"uncompressed", "gzip"))
+        case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact,
LogConfig.Delete))
+        case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble
* .9 + .1))
+        case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue
- 1) + 1).toString)
+        case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
+        case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
+        case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
+      }
+    })
+
+    val actual = LogConfig.fromProps(expected).toProps
+    Assert.assertEquals(expected, actual)
+  }
+
+  @Test
+  def testFromPropsInvalid() {
+    LogConfig.configNames().foreach((name) => {
+      name match {
+        case LogConfig.UncleanLeaderElectionEnableProp  => return
+        case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
+        case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" )
+        case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
+        case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number",
"-0.1", "1.2")
+        case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number",
"0", "-1")
+        case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+      }
+    })
+   }
+
+  private def assertPropertyInvalid(name: String, values: AnyRef*) {
+    values.foreach((value) => {
+      val props = new Properties
+      props.setProperty(name, value.toString)
+      intercept[ConfigException] {
+        LogConfig.fromProps(props)
+      }
+    })
+  }
+
+  private def randFrom[T](choices: T*): T = {
+    import scala.util.Random
+    choices(Random.nextInt(choices.size))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3139cc31/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 40c265a..2428dbd 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -83,6 +83,59 @@ class KafkaConfigTest extends JUnit3Suite {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
+  @Test
+  def testLogRetentionUnlimited() {
+    val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+    val props2 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+    val props3 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+    val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+    val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
+
+    props1.put("log.retention.ms", "-1")
+    props2.put("log.retention.minutes", "-1")
+    props3.put("log.retention.hours", "-1")
+
+    val cfg1 = KafkaConfig.fromProps(props1)
+    val cfg2 = KafkaConfig.fromProps(props2)
+    val cfg3 = KafkaConfig.fromProps(props3)
+    assertEquals("Should be -1", -1, cfg1.logRetentionTimeMillis)
+    assertEquals("Should be -1", -1, cfg2.logRetentionTimeMillis)
+    assertEquals("Should be -1", -1, cfg3.logRetentionTimeMillis)
+
+    props4.put("log.retention.ms", "-1")
+    props4.put("log.retention.minutes", "30")
+
+    val cfg4 = KafkaConfig.fromProps(props4)
+    assertEquals("Should be -1", -1, cfg4.logRetentionTimeMillis)
+
+    props5.put("log.retention.ms", "0")
+
+    intercept[IllegalArgumentException] {
+      val cfg5 = KafkaConfig.fromProps(props5)
+    }
+  }
+
+  @Test
+  def testLogRetentionValid {
+    val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+
+    props1.put("log.retention.ms", "0")
+    props2.put("log.retention.minutes", "0")
+    props3.put("log.retention.hours", "0")
+
+    intercept[IllegalArgumentException] {
+      val cfg1 = KafkaConfig.fromProps(props1)
+    }
+    intercept[IllegalArgumentException] {
+      val cfg2 = KafkaConfig.fromProps(props2)
+    }
+    intercept[IllegalArgumentException] {
+      val cfg3 = KafkaConfig.fromProps(props3)
+    }
+
+  }
 
   @Test
   def testAdvertiseDefaults() {


Mime
View raw message