kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject git commit: KAFKA-1289 Misc. nitpicks in log cleaner for new 0.8.1 features patch by Jay Kreps, reviewed by Sriram Subramanian and Jun Rao
Date Tue, 04 Mar 2014 20:30:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 40c6555eb -> 77118a935


KAFKA-1289 Misc. nitpicks in log cleaner for new 0.8.1 features patch by Jay Kreps, reviewed
by Sriram Subramanian and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/77118a93
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/77118a93
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/77118a93

Branch: refs/heads/trunk
Commit: 77118a935ee28da80c67d4050f41a1e7e838ebaa
Parents: 40c6555
Author: Joe Stein <joe.stein@stealth.ly>
Authored: Tue Mar 4 15:30:59 2014 -0500
Committer: Joe Stein <joe.stein@stealth.ly>
Committed: Tue Mar 4 15:30:59 2014 -0500

----------------------------------------------------------------------
 config/log4j.properties                                     | 2 --
 config/server.properties                                    | 9 +++++----
 core/src/main/scala/kafka/log/CleanerConfig.scala           | 2 +-
 core/src/main/scala/kafka/log/LogCleaner.scala              | 7 ++++++-
 core/src/main/scala/kafka/log/LogCleanerManager.scala       | 5 ++++-
 core/src/main/scala/kafka/log/LogConfig.scala               | 8 ++++----
 core/src/main/scala/kafka/log/LogManager.scala              | 2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala          | 2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala          | 2 +-
 core/src/test/scala/other/kafka/TestLogCleaning.scala       | 9 +++++----
 core/src/test/scala/unit/kafka/log/CleanerTest.scala        | 2 +-
 .../scala/unit/kafka/log/LogCleanerIntegrationTest.scala    | 2 +-
 12 files changed, 30 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index 1ab8507..baa698b 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -73,8 +73,6 @@ log4j.additivity.kafka.controller=false
 
 log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
 log4j.additivity.kafka.log.LogCleaner=false
-log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender
-log4j.additivity.kafka.log.Cleaner=false
 
 log4j.logger.state.change.logger=TRACE, stateChangeAppender
 log4j.additivity.state.change.logger=false

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 2ffe0eb..c9e923a 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -40,7 +40,7 @@ port=9092
 num.network.threads=2
  
 # The number of threads doing disk I/O
-num.io.threads=2
+num.io.threads=8
 
 # The send buffer (SO_SNDBUF) used by the socket server
 socket.send.buffer.bytes=1048576
@@ -100,6 +100,10 @@ log.segment.bytes=536870912
 # to the retention policies
 log.retention.check.interval.ms=60000
 
+# By default the log cleaner is disabled and the log retention policy will default to just
delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then
be marked for log compaction.
+log.cleaner.enable=false
+
 ############################# Zookeeper #############################
 
 # Zookeeper connection string (see zookeeper docs for details).
@@ -111,6 +115,3 @@ zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zookeeper.connection.timeout.ms=1000000
-
-
-log.cleanup.policy=delete

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/main/scala/kafka/log/CleanerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala
index fa946ad..ade8386 100644
--- a/core/src/main/scala/kafka/log/CleanerConfig.scala
+++ b/core/src/main/scala/kafka/log/CleanerConfig.scala
@@ -35,7 +35,7 @@ case class CleanerConfig(val numThreads: Int = 1,
                          val ioBufferSize: Int = 1024*1024,
                          val maxMessageSize: Int = 32*1024*1024,
                          val maxIoBytesPerSecond: Double = Double.MaxValue,
-                         val backOffMs: Long = 60 * 1000,
+                         val backOffMs: Long = 15 * 1000,
                          val enableCleaner: Boolean = true,
                          val hashAlgorithm: String = "MD5") {
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 6404647..312204c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -131,6 +131,9 @@ class LogCleaner(val config: CleanerConfig,
    */
   private class CleanerThread(threadId: Int)
     extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible
= false) {
+    
+    override val loggerName = classOf[LogCleaner].getName
+    
     if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
       warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring
excess buffer space...")
 
@@ -185,7 +188,7 @@ class LogCleaner(val config: CleanerConfig,
     def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
       def mb(bytes: Double) = bytes / (1024*1024)
       val message = 
-        "%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name,
from, to) + 
+        "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id,
name, from, to) + 
         "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead),

                                                                                 stats.elapsedSecs,

                                                                                 mb(stats.bytesRead/stats.elapsedSecs))
+ 
@@ -222,6 +225,8 @@ private[log] class Cleaner(val id: Int,
                            throttler: Throttler,
                            time: Time,
                            checkDone: (TopicAndPartition) => Unit) extends Logging {
+  
+  override val loggerName = classOf[LogCleaner].getName
 
   this.logIdent = "Cleaner " + id + ": "
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 1612c8d..43e5c1f 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -40,6 +40,9 @@ private[log] case object LogCleaningPaused extends LogCleaningState
  *  requested to be resumed.
  */
 private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition,
Log]) extends Logging {
+  
+  override val loggerName = classOf[LogCleaner].getName
+  
   /* the offset checkpoints holding the last cleaned point for each log */
   private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir,
"cleaner-offset-checkpoint")))).toMap
 
@@ -65,7 +68,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
   def grabFilthiestLog(): Option[LogToClean] = {
     inLock(lock) {
       val lastClean = allCleanerCheckpoints()
-      val cleanableLogs = logs.filter(l => l._2.config.dedupe)                       
             // skip any logs marked for delete rather than dedupe
+      val cleanableLogs = logs.filter(l => l._2.config.compact)                      
             // skip any logs marked for delete rather than dedupe
                               .filterNot(l => inProgress.contains(l._1))             
             // skip any logs already in-progress
                               .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1,
0)))      // create a LogToClean instance for each
       val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0)                 
                // must have some bytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 0b32aee..18c86fe 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -34,7 +34,7 @@ import kafka.common._
  * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
  * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable
for logs that are being compacted.
  * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes
already cleaned
- * @param dedupe Should old segments in this log be deleted or deduplicated?
+ * @param compact Should old segments in this log be deleted or deduplicated?
  */
 case class LogConfig(val segmentSize: Int = 1024*1024, 
                      val segmentMs: Long = Long.MaxValue,
@@ -48,7 +48,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
                      val fileDeleteDelayMs: Long = 60*1000,
                      val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L,
                      val minCleanableRatio: Double = 0.5,
-                     val dedupe: Boolean = false) {
+                     val compact: Boolean = false) {
   
   def toProps: Properties = {
     val props = new Properties()
@@ -65,7 +65,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
     props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
     props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
     props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
-    props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
+    props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
     props
   }
   
@@ -117,7 +117,7 @@ object LogConfig {
                   fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
                   deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong,
                   minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
-                  dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
+                  compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete")
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 10062af..bcd2bb7 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -351,7 +351,7 @@ class LogManager(val logDirs: Array[File],
     debug("Beginning log cleanup...")
     var total = 0
     val startMs = time.milliseconds
-    for(log <- allLogs; if !log.config.dedupe) {
+    for(log <- allLogs; if !log.config.compact) {
       debug("Garbage collecting '" + log.name + "'")
       total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 04a5d39..b871843 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -137,7 +137,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor",
0.9d)
   
   /* the amount of time to sleep when there are no logs to clean */
-  val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L,
Long.MaxValue))
+  val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L,
Long.MaxValue))
   
   /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */
   val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5)

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c606b50..feb2093 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -262,7 +262,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                                      deleteRetentionMs = config.logCleanerDeleteRetentionMs,
                                      fileDeleteDelayMs = config.logDeleteDelayMs,
                                      minCleanableRatio = config.logCleanerMinCleanRatio,
-                                     dedupe = config.logCleanupPolicy.trim.toLowerCase ==
"dedupe")
+                                     compact = config.logCleanupPolicy.trim.toLowerCase ==
"compact")
     val defaultProps = defaultLogConfig.toProps
     val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps,
_))
     // read the log configurations from zookeeper

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/test/scala/other/kafka/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala
index 22b16e5..d20d132 100644
--- a/core/src/test/scala/other/kafka/TestLogCleaning.scala
+++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala
@@ -243,11 +243,11 @@ object TestLogCleaning {
                       percentDeletes: Int): File = {
     val producerProps = new Properties
     producerProps.setProperty("producer.type", "async")
-    producerProps.setProperty("broker.list", brokerUrl)
+    producerProps.setProperty("metadata.broker.list", brokerUrl)
     producerProps.setProperty("serializer.class", classOf[StringEncoder].getName)
     producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName)
     producerProps.setProperty("queue.enqueue.timeout.ms", "-1")
-    producerProps.setProperty("batch.size", 1000.toString)
+    producerProps.setProperty("batch.num.messages", 1000.toString)
     val producer = new Producer[String, String](new ProducerConfig(producerProps))
     val rand = new Random(1)
     val keyCount = (messages / dups).toInt
@@ -275,8 +275,9 @@ object TestLogCleaning {
   def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
     val consumerProps = new Properties
     consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
-    consumerProps.setProperty("zk.connect", zkUrl)
-    consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString)
+    consumerProps.setProperty("zookeeper.connect", zkUrl)
+    consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString)
+    consumerProps.setProperty("auto.offset.reset", "smallest")
     new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 51cd94b..d10e4f4 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -33,7 +33,7 @@ import kafka.message._
 class CleanerTest extends JUnitSuite {
   
   val dir = TestUtils.tempDir()
-  val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true)
+  val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true)
   val time = new MockTime()
   val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue,
time = time)
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/77118a93/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 1de3ef0..9aeb69d 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -101,7 +101,7 @@ class LogCleanerIntegrationTest extends JUnitSuite {
       val dir = new File(logDir, "log-" + i)
       dir.mkdirs()
       val log = new Log(dir = dir,
-                        LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs
= deleteDelay, dedupe = true),
+                        LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs
= deleteDelay, compact = true),
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
                         time = time)


Mime
View raw message