kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1363542 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/LogManager.scala main/scala/kafka/server/KafkaApis.scala main/scala/kafka/server/KafkaServer.scala test/scala/unit/kafka/integration/TopicMetadataTest.scala
Date Thu, 19 Jul 2012 21:02:16 GMT
Author: jkreps
Date: Thu Jul 19 21:02:15 2012
New Revision: 1363542

URL: http://svn.apache.org/viewvc?rev=1363542&view=rev
Log:
KAFKA-371 Refactoring of LogManager. Reviewed by Neha.


Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1363542&r1=1363541&r2=1363542&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Thu Jul 19
21:02:15 2012
@@ -1,5 +1,5 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ 	* Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -40,12 +40,10 @@ private[kafka] class LogManager(val conf
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
   private val flushInterval = config.flushInterval
-  private val topicPartitionsMap = config.topicPartitionsMap
   private val logCreationLock = new Object
-  private val startupLatch: CountDownLatch = new CountDownLatch(1)
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
-  private val logFlushIntervalMap = config.flushIntervalMap
-  private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
+  private val logFlushIntervals = config.flushIntervalMap
+  private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 *
60 * 1000L)) // convert hours to ms
   private val logRetentionSize = config.logRetentionSize
   private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
 
@@ -72,19 +70,11 @@ private[kafka] class LogManager(val conf
       }
     }
   }
-  
-  private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long]
= {
-    var ret = new mutable.HashMap[String, Long]
-    for ( (topic, hour) <- logRetentionHourMap )
-      ret.put(topic, hour * 60 * 60 * 1000L)
-    ret
-  }
 
   /**
-   *  Register this broker in ZK for the first time.
+   *  Start the log flush thread
    */
   def startup() {
-
     /* Schedule the cleanup task to delete old logs */
     if(scheduler != null) {
       if(scheduler.hasShutdown) {
@@ -96,13 +86,8 @@ private[kafka] class LogManager(val conf
     }
 
     if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp
-    info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the
following overrides " + logFlushIntervalMap)
+    info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the
following overrides " + logFlushIntervals)
     logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
-    startupLatch.countDown
-  }
-
-  private def awaitStartup() {
-    startupLatch.await
   }
 
 
@@ -112,9 +97,9 @@ private[kafka] class LogManager(val conf
   private def createLog(topic: String, partition: Int): Log = {
     if (topic.length <= 0)
       throw new InvalidTopicException("topic name can't be emtpy")
-    if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions))
{
+    if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions))
{
       val error = "Wrong partition %d, valid partitions (0, %d)."
-        .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
+        .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
       warn(error)
       throw new InvalidPartitionException(error)
     }
@@ -125,22 +110,6 @@ private[kafka] class LogManager(val conf
     }
   }
 
-  /**
-   * Return the Pool (partitions) for a specific log
-   */
-  private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = {
-    awaitStartup
-    if (topic.length <= 0)
-      throw new InvalidTopicException("topic name can't be empty")
-    if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions))
{
-      val error = "Wrong partition %d, valid partitions (0, %d)."
-        .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
-      warn(error)
-      throw new InvalidPartitionException(error)
-    }
-    logs.get(topic)
-  }
-
   def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
     val log = getLog(offsetRequest.topic, offsetRequest.partition)
     log match {
@@ -150,10 +119,10 @@ private[kafka] class LogManager(val conf
   }
 
   /**
-   * Get the log if exists
+   * Get the log if it exists
    */
   def getLog(topic: String, partition: Int): Option[Log] = {
-    val parts = getLogPool(topic, partition)
+    val parts = logs.get(topic)
     if (parts == null) None
     else {
       val log = parts.get(partition)
@@ -167,7 +136,7 @@ private[kafka] class LogManager(val conf
    */
   def getOrCreateLog(topic: String, partition: Int): Log = {
     var hasNewTopic = false
-    var parts = getLogPool(topic, partition)
+    var parts = logs.get(topic)
     if (parts == null) {
       val found = logs.putIfNotExists(topic, new Pool[Int, Log])
       if (found == null)
@@ -195,8 +164,8 @@ private[kafka] class LogManager(val conf
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
     val topic = Utils.getTopicPartition(log.name)._1
-    val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
-    val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
+    val logCleanupThresholdMs = logRetentionMs.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+    val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMs)
     val total = log.deleteSegments(toBeDeleted)
     total
   }
@@ -226,11 +195,9 @@ private[kafka] class LogManager(val conf
    */
   def cleanupLogs() {
     debug("Beginning log cleanup...")
-    val iter = getLogIterator
     var total = 0
     val startMs = time.milliseconds
-    while(iter.hasNext) {
-      val log = iter.next
+    for(log <- allLogs) {
       debug("Garbage collecting '" + log.name + "'")
       total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
     }
@@ -245,41 +212,23 @@ private[kafka] class LogManager(val conf
     info("Closing log manager")
     scheduler.shutdown()
     logFlusherScheduler.shutdown()
-    val iter = getLogIterator
-    while(iter.hasNext)
-      iter.next.close()
+    allLogs.foreach(_.close())
   }
   
-  private def getLogIterator(): Iterator[Log] = {
-    new IteratorTemplate[Log] {
-      val partsIter = logs.values.iterator
-      var logIter: Iterator[Log] = null
-
-      override def makeNext(): Log = {
-        while (true) {
-          if (logIter != null && logIter.hasNext)
-            return logIter.next
-          if (!partsIter.hasNext)
-            return allDone
-          logIter = partsIter.next.values.iterator
-        }
-        // should never reach here
-        assert(false)
-        return allDone
-      }
-    }
-  }
+  /**
+   * Get all the partition logs
+   */
+  def allLogs() = logs.values.flatMap(_.values)
 
   private def flushAllLogs() = {
     debug("flushing the high watermark of all logs")
-
-    for (log <- getLogIterator)
+    for (log <- allLogs)
     {
       try{
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
         var logFlushInterval = config.defaultFlushIntervalMs
-        if(logFlushIntervalMap.contains(log.topicName))
-          logFlushInterval = logFlushIntervalMap(log.topicName)
+        if(logFlushIntervals.contains(log.topicName))
+          logFlushInterval = logFlushIntervals(log.topicName)
         debug(log.topicName + " flush interval  " + logFlushInterval +
             " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
@@ -299,8 +248,6 @@ private[kafka] class LogManager(val conf
   }
 
 
-  def getAllTopics(): Iterator[String] = logs.keys.iterator
-  def getTopicPartitionsMap() = topicPartitionsMap
+  def topics(): Iterable[String] = logs.keys
 
-  def getServerConfig: KafkaConfig = config
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1363542&r1=1363541&r2=1363542&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Jul
19 21:02:15 2012
@@ -381,7 +381,6 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Topic metadata request " + metadataRequest.toString())
 
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
-    val config = logManager.getServerConfig
     val zkClient = kafkaZookeeper.getZookeeperClient
     val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
 
@@ -391,6 +390,7 @@ class KafkaApis(val requestChannel: Requ
         case Some(metadata) => topicsMetadata += metadata
         case None =>
           /* check if auto creation of topics is turned on */
+          val config = logManager.config
           if(config.autoCreateTopics) {
             CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
             info("Auto creation of topic %s with %d partitions and replication factor %d
is successful!"

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1363542&r1=1363541&r2=1363542&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Jul
19 21:02:15 2012
@@ -64,6 +64,7 @@ class KafkaServer(val config: KafkaConfi
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)
+    logManager.startup()
                                                 
     socketServer = new SocketServer(config.port,
                                     config.numNetworkThreads,
@@ -78,19 +79,12 @@ class KafkaServer(val config: KafkaConfi
 
     apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
-    socketServer.startup
+    socketServer.startup()
 
     Mx4jLoader.maybeLoad
 
-    /**
-     *  Registers this broker in ZK. After this, consumers can connect to broker.
-     *  So this should happen after socket server start.
-     */
-    logManager.startup
-
     // starting relevant replicas and leader election for partitions assigned to this broker
-    kafkaZookeeper.startup
-
+    kafkaZookeeper.startup()
     kafkaController.startup()
 
     info("Server started.")

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1363542&r1=1363541&r2=1363542&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
Thu Jul 19 21:02:15 2012
@@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Su
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
     val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
-    EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
+    EasyMock.expect(logManager.config).andReturn(configs.head)
     EasyMock.replay(logManager)
     EasyMock.replay(kafkaZookeeper)
 
@@ -118,7 +118,6 @@ class TopicMetadataTest extends JUnit3Su
     assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
 
     // verify the expected calls to log manager occurred in the right order
-    EasyMock.verify(logManager)
     EasyMock.verify(kafkaZookeeper)
     EasyMock.verify(receivedRequest)
   }



Mime
View raw message