kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1375360 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/ log/ network/ server/ utils/
Date Tue, 21 Aug 2012 01:10:01 GMT
Author: junrao
Date: Tue Aug 21 01:10:00 2012
New Revision: 1375360

URL: http://svn.apache.org/viewvc?rev=1375360&view=rev
Log:
Create a generic Kafka thread; patched by Yang Ye; reviewed by Jun Rao, Jay Kreps; KAFKA-56

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Tue Aug 21 01:10:00 2012
@@ -23,10 +23,9 @@ import kafka.cluster.{Cluster, Broker}
 import scala.collection.immutable
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils.ZkUtils._
-import kafka.utils.SystemTime
-import java.util.concurrent.CountDownLatch
+import kafka.utils.{ShutdownableThread, SystemTime}
+
 
 /**
  *  Usage:
@@ -42,46 +41,33 @@ class ConsumerFetcherManager(private val
   private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)]
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
-  private val isShuttingDown = new AtomicBoolean(false)
-  private val leaderFinderThreadShutdownLatch = new CountDownLatch(1)  
-  private val leaderFinderThread = new Thread(consumerIdString + "_leader_finder_thread")
{
+  private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){
     // thread responsible for adding the fetcher to the right broker when leader is available
-    override def run() {
-      info("starting %s".format(getName))
-      while (!isShuttingDown.get) {
-        try {
-          lock.lock()
-          try {
-            if (noLeaderPartitionSet.isEmpty)
-              cond.await()
-            for ((topic, partitionId) <- noLeaderPartitionSet) {
-              // find the leader for this partition
-              getLeaderForPartition(zkClient, topic, partitionId) match {
-                case Some(leaderId) =>
-                  cluster.getBroker(leaderId) match {
-                    case Some(broker) =>
-                      val pti = partitionMap((topic, partitionId))
-                      addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
-                      noLeaderPartitionSet.remove((topic, partitionId))
-                    case None =>
-                      error("Broker %d is unavailable, fetcher for topic %s partition %d
could not be started"
-                            .format(leaderId, topic, partitionId))
-                  }
-                case None => // let it go since we will keep retrying
+    override def doWork() {
+      lock.lock()
+      try {
+        if (noLeaderPartitionSet.isEmpty)
+          cond.await()
+        for ((topic, partitionId) <- noLeaderPartitionSet) {
+          // find the leader for this partition
+          getLeaderForPartition(zkClient, topic, partitionId) match {
+            case Some(leaderId) =>
+              cluster.getBroker(leaderId) match {
+                case Some(broker) =>
+                  val pti = partitionMap((topic, partitionId))
+                  addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
+                  noLeaderPartitionSet.remove((topic, partitionId))
+                case None =>
+                  error("Broker %d is unavailable, fetcher for topic %s partition %d could
not be started"
+                                .format(leaderId, topic, partitionId))
               }
-            }
-          } finally {
-            lock.unlock()
+            case None => // let it go since we will keep retrying
           }
-          Thread.sleep(config.refreshLeaderBackoffMs)
-        } catch {
-          case t =>
-            if (!isShuttingDown.get())
-              error("error in %s".format(getName), t)
         }
+      } finally {
+        lock.unlock()
       }
-      leaderFinderThreadShutdownLatch.countDown()
-      info("stopping %s".format(getName))
+      Thread.sleep(config.refreshLeaderBackoffMs)
     }
   }
   leaderFinderThread.start()
@@ -92,7 +78,7 @@ class ConsumerFetcherManager(private val
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
-    if (isShuttingDown.get)
+    if (!leaderFinderThread.isRunning.get())
       throw new RuntimeException("%s already shutdown".format(name))
     lock.lock()
     try {
@@ -146,9 +132,7 @@ class ConsumerFetcherManager(private val
 
   def shutdown() {
     info("shutting down")
-    isShuttingDown.set(true)
-    leaderFinderThread.interrupt()
-    leaderFinderThreadShutdownLatch.await()
+    leaderFinderThread.shutdown()
     stopAllConnections()
     info("shutdown completed")
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Tue Aug 21 01:10:00 2012
@@ -115,7 +115,7 @@ private[kafka] class ZookeeperConsumerCo
     }
     config.groupId + "_" + consumerUuid
   }
-  this.logIdent = consumerIdString + " "
+  this.logIdent = "[" + consumerIdString + "], "
 
   connectZk()
   createFetcher()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Aug 21 01:10:00
2012
@@ -115,7 +115,7 @@ class LogSegment(val file: File, val mes
  */
 @threadsafe
 private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery:
Boolean, time: Time, brokerId: Int = 0) extends Logging {
-  this.logIdent = "Kafka Log on Broker " + brokerId + ", "
+  this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
 
   import kafka.log.Log._
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Tue Aug 21
01:10:00 2012
@@ -44,7 +44,7 @@ private[kafka] class LogManager(val conf
   private val logFlushIntervals = config.flushIntervalMap
   private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 *
60 * 1000L)) // convert hours to ms
   private val logRetentionSize = config.logRetentionSize
-  this.logIdent = "Log Manager on Broker " + config.brokerId + ", "
+  this.logIdent = "[Log Manager on Broker " + config.brokerId + "], "
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Tue
Aug 21 01:10:00 2012
@@ -37,7 +37,7 @@ class SocketServer(val brokerId: Int,
                    val monitoringPeriodSecs: Int,
                    val maxQueuedRequests: Int,
                    val maxRequestSize: Int = Int.MaxValue) extends Logging {
-  this.logIdent = "Socket Server on Broker " + brokerId + ", "
+  this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
   private var acceptor: Acceptor = new Acceptor(port, processors)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
Tue Aug 21 01:10:00 2012
@@ -25,7 +25,7 @@ abstract class AbstractFetcherManager(pr
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
   private val mapLock = new Object
-  this.logIdent = name + " "
+  this.logIdent = "[" + name + "], "
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
@@ -57,7 +57,7 @@ abstract class AbstractFetcherManager(pr
       for ((key, fetcher) <- fetcherThreadMap) {
         fetcher.removePartition(topic, partitionId)
         if (fetcher.partitionCount <= 0) {
-          fetcher.shutdown
+          fetcher.shutdown()
           fetcherThreadMap.remove(key)
         }
       }
@@ -76,7 +76,7 @@ abstract class AbstractFetcherManager(pr
   def closeAllFetchers() {
     mapLock synchronized {
       for ( (_, fetcher) <- fetcherThreadMap) {
-        fetcher.shutdown
+        fetcher.shutdown()
       }
       fetcherThreadMap.clear()
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Tue Aug 21 01:10:00 2012
@@ -17,29 +17,24 @@
 
 package kafka.server
 
-import java.util.concurrent.CountDownLatch
 import kafka.cluster.Broker
 import kafka.consumer.SimpleConsumer
-import java.util.concurrent.atomic.AtomicBoolean
-import kafka.utils.Logging
 import kafka.common.ErrorMapping
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
+import kafka.utils.ShutdownableThread
+
 
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-abstract class AbstractFetcherThread(val name: String, sourceBroker: Broker, socketTimeout:
Int, socketBufferSize: Int,
+abstract class  AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout:
Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1)
-  extends Thread(name) with Logging {
-  private val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  private val shutdownLatch = new CountDownLatch(1)
+  extends ShutdownableThread(name) {
   private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId)
-> offset map
   private val fetchMapLock = new Object
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout,
socketBufferSize)
-  this.logIdent = name + " "
-  info("starting")
   // callbacks to be defined in subclass
 
   // process fetched data
@@ -51,78 +46,75 @@ abstract class AbstractFetcherThread(val
   // deal with partitions with errors, potentially due to leadership changes
   def handlePartitionsWithErrors(partitions: Iterable[(String, Int)])
 
-  override def run() {
-    try {
-      while(isRunning.get) {
-        val builder = new FetchRequestBuilder().
-          clientId(name).
-          replicaId(fetcherBrokerId).
-          maxWait(maxWait).
-          minBytes(minBytes)
-
-        fetchMapLock synchronized {
-          for ( ((topic, partitionId), offset) <- fetchMap )
-            builder.addFetch(topic, partitionId, offset.longValue, fetchSize)
-        }
+  override def shutdown(){
+    super.shutdown()
+    simpleConsumer.close()
+  }
 
-        val fetchRequest = builder.build()
-        val partitionsWithError = new mutable.HashSet[(String, Int)]
-        var response: FetchResponse = null
-        try {
-          trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
-          response = simpleConsumer.fetch(fetchRequest)
-        } catch {
-          case t =>
-            debug("error in fetch %s".format(fetchRequest), t)
-            if (isRunning.get) {
-              fetchMapLock synchronized {
-                partitionsWithError ++= fetchMap.keys
-                fetchMap.clear()
-              }
-            }
-        }
+  override def doWork() {
+    val builder = new FetchRequestBuilder().
+            clientId(name).
+            replicaId(fetcherBrokerId).
+            maxWait(maxWait).
+            minBytes(minBytes)
+
+    fetchMapLock synchronized {
+      for ( ((topic, partitionId), offset) <- fetchMap )
+        builder.addFetch(topic, partitionId, offset.longValue, fetchSize)
+    }
 
-        if (response != null) {
-          // process fetched data
+    val fetchRequest = builder.build()
+    val partitionsWithError = new mutable.HashSet[(String, Int)]
+    var response: FetchResponse = null
+    try {
+      trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+      response = simpleConsumer.fetch(fetchRequest)
+    } catch {
+      case t =>
+        debug("error in fetch %s".format(fetchRequest), t)
+        if (isRunning.get) {
           fetchMapLock synchronized {
-            for ( topicData <- response.data ) {
-              for ( partitionData <- topicData.partitionDataArray) {
-                val topic = topicData.topic
-                val partitionId = partitionData.partition
-                val key = (topic, partitionId)
-                val currentOffset = fetchMap.get(key)
-                if (currentOffset.isDefined) {
-                  partitionData.error match {
-                    case ErrorMapping.NoError =>
-                      processPartitionData(topic, currentOffset.get, partitionData)
-                      val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
-                      fetchMap.put(key, newOffset)
-                    case ErrorMapping.OffsetOutOfRangeCode =>
-                      val newOffset = handleOffsetOutOfRange(topic, partitionId)
-                      fetchMap.put(key, newOffset)
-                      warn("current offset %d for topic %s partition %d out of range; reset
offset to %d"
-                        .format(currentOffset.get, topic, partitionId, newOffset))
-                    case _ =>
-                      error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
-                            ErrorMapping.exceptionFor(partitionData.error))
-                      partitionsWithError += key
-                      fetchMap.remove(key)
-                  }
-                }
+            partitionsWithError ++= fetchMap.keys
+            fetchMap.clear()
+          }
+        }
+    }
+
+    if (response != null) {
+      // process fetched data
+      fetchMapLock synchronized {
+        for ( topicData <- response.data ) {
+          for ( partitionData <- topicData.partitionDataArray) {
+            val topic = topicData.topic
+            val partitionId = partitionData.partition
+            val key = (topic, partitionId)
+            val currentOffset = fetchMap.get(key)
+            if (currentOffset.isDefined) {
+              partitionData.error match {
+                case ErrorMapping.NoError =>
+                  processPartitionData(topic, currentOffset.get, partitionData)
+                  val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                  fetchMap.put(key, newOffset)
+                case ErrorMapping.OffsetOutOfRangeCode =>
+                  val newOffset = handleOffsetOutOfRange(topic, partitionId)
+                  fetchMap.put(key, newOffset)
+                  warn("current offset %d for topic %s partition %d out of range; reset offset
to %d"
+                               .format(currentOffset.get, topic, partitionId, newOffset))
+                case _ =>
+                  error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
+                        ErrorMapping.exceptionFor(partitionData.error))
+                  partitionsWithError += key
+                  fetchMap.remove(key)
               }
             }
           }
         }
-        if (partitionsWithError.size > 0) {
-          debug("handling partitions with error for %s".format(partitionsWithError))
-          handlePartitionsWithErrors(partitionsWithError)
-        }
       }
-    } catch {
-      case e: InterruptedException => info("interrupted. Shutting down")
-      case e1 => error("error in fetching", e1)
     }
-    shutdownComplete()
+    if (partitionsWithError.size > 0) {
+      debug("handling partitions with error for %s".format(partitionsWithError))
+      handlePartitionsWithErrors(partitionsWithError)
+    }
   }
 
   def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
@@ -148,16 +140,4 @@ abstract class AbstractFetcherThread(val
       fetchMap.size
     }
   }
-  
-  private def shutdownComplete() = {
-    simpleConsumer.close()
-    shutdownLatch.countDown
-  }
-
-  def shutdown() {
-    isRunning.set(false)
-    interrupt()
-    shutdownLatch.await()
-    info("shutdown completed")
-  }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Aug
21 01:10:00 2012
@@ -18,21 +18,21 @@
 package kafka.server
 
 import java.io.IOException
+import java.util.concurrent.TimeUnit
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.common._
 import kafka.log._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
 import scala.math._
 import kafka.network.RequestChannel.Response
-import java.util.concurrent.TimeUnit
-import kafka.metrics.KafkaMetricsGroup
 import kafka.cluster.Replica
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Pool, ZkUtils, SystemTime, Logging}
 
 
 /**
@@ -47,12 +47,12 @@ class KafkaApis(val requestChannel: Requ
                 brokerId: Int) extends Logging {
 
   private val metricsGroup = brokerId.toString
-  private val producerRequestPurgatory = new ProducerRequestPurgatory
-  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
+  private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
+  private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
-  this.logIdent = "KafkaApis-%d ".format(brokerId)
+  this.logIdent = "[KafkaApi on Broker " + brokerId + "], "
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -511,9 +511,9 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch,
Null](brokerId) {
+  class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch,
Null](brokerId) {
 
-    this.logIdent = "FetchRequestPurgatory-%d ".format(brokerId)
+    this.logIdent = "[etchRequestPurgatory-%d], ".format(brokerId)
 
     override def metricsGroupIdent = metricsGroup
 
@@ -677,10 +677,9 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce,
RequestKey](brokerId) {
+  private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce,
RequestKey](brokerId) {
 
-
-    this.logIdent = "ProducerRequestPurgatory-%d ".format(brokerId)
+    this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId)
 
     override def metricsGroupIdent = metricsGroup
 
@@ -700,8 +699,6 @@ class KafkaApis(val requestChannel: Requ
   }
 
   private class DelayedRequestMetrics {
-
-
     private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel)
extends KafkaMetricsGroup {
       override def metricsGroupIdent = metricsGroup
       val caughtUpFollowerFetchRequestMeter =

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Tue
Aug 21 01:10:00 2012
@@ -22,41 +22,29 @@ import collection.immutable.Set
 import kafka.cluster.Broker
 import kafka.api._
 import kafka.network.{Receive, BlockingChannel}
-import kafka.utils.{ZkUtils, Logging}
-import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
+import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import collection.JavaConversions._
+import kafka.utils.{ShutdownableThread, ZkUtils, Logging}
 import java.lang.Object
 
+
 class RequestSendThread(val controllerId: Int,
                         val toBrokerId: Int,
                         val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse)
=> Unit)],
                         val channel: BlockingChannel)
-        extends Thread("requestSendThread-" + toBrokerId) with Logging {
-  this.logIdent = "Controller %d, request send thread to broker %d, ".format(controllerId,
toBrokerId)
-  val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  private val shutdownLatch = new CountDownLatch(1)
+        extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId,
toBrokerId)){
   private val lock = new Object()
 
-  def shutdown(): Unit = {
-    info("shutting down")
-    isRunning.set(false)
-    interrupt()
-    shutdownLatch.await()
-    info("shutted down completed")
-  }
-
-  override def run(): Unit = {
-    try{
-      while(isRunning.get()){
-        val queueItem = queue.take()
-        val request = queueItem._1
-        val callback = queueItem._2
+  override def doWork(): Unit = {
+    val queueItem = queue.take()
+    val request = queueItem._1
+    val callback = queueItem._2
 
-        var receive: Receive = null
+    var receive: Receive = null
+    
         try{
           lock synchronized {
             channel.send(request)
@@ -80,12 +68,7 @@ class RequestSendThread(val controllerId
             debug("Exception occurs", e)
         }
       }
-    } catch{
-      case e: InterruptedException => warn("intterrupted. Shutting down")
-      case e1 => error("Error due to ", e1)
-    }
-    shutdownLatch.countDown()
-  }
+ 
 }
 
 class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{
@@ -93,8 +76,8 @@ class ControllerChannelManager(allBroker
   private val messageChannels = new HashMap[Int, BlockingChannel]
   private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse)
=> Unit)]]
   private val messageThreads = new HashMap[Int, RequestSendThread]
-  private val lock = new Object()
-  this.logIdent = "Channel manager on controller " + config.brokerId + ", "
+  private val lock = new Object
+  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
   for(broker <- allBrokers){
     brokers.put(broker.id, broker)
     info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and
port: " + broker.port)
@@ -162,8 +145,8 @@ class ControllerChannelManager(allBroker
 }
 
 class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
-  this.logIdent = "Controller " + config.brokerId + ", "
-  info("startup")
+  this.logIdent = "[Controller " + config.brokerId + "], "
+  info("startup");
   private var isRunning = true
   private val controllerLock = new Object
   private var controllerChannelManager: ControllerChannelManager = null
@@ -266,7 +249,7 @@ class KafkaController(config : KafkaConf
   }
 
   class SessionExpireListener() extends IZkStateListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
+    this.logIdent = "[Controller " + config.brokerId + "], "
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
@@ -459,7 +442,7 @@ class KafkaController(config : KafkaConf
   }
 
   class BrokerChangeListener() extends IZkChildListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
+    this.logIdent = "[Controller " + config.brokerId + "], "
     def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String])
{
       controllerLock synchronized {
         info("broker change listener triggered")
@@ -507,7 +490,7 @@ class KafkaController(config : KafkaConf
   }
 
   class TopicChangeListener extends IZkChildListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
+    this.logIdent = "[Controller " + config.brokerId + "], "
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@@ -530,7 +513,7 @@ class KafkaController(config : KafkaConf
   }
 
   class ControllerExistListener extends IZkDataListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
+    this.logIdent = "[Controller " + config.brokerId + "], "
 
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Tue Aug 21 01:10:00 2012
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.Atomi
  * A thread that answers kafka requests.
  */
 class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis:
KafkaApis) extends Runnable with Logging {
-  this.logIdent = "Kafka Request Handler " + id + " on Broker " + brokerId + ", "
+  this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
 
   def run() { 
     while(true) { 
@@ -46,7 +46,7 @@ class KafkaRequestHandlerPool(val broker
                               val requestChannel: RequestChannel,
                               val apis: KafkaApis,
                               numThreads: Int) extends Logging {
-  this.logIdent = "Kafka Request Handler on Broker " + brokerId + ", "
+  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) { 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Aug
21 01:10:00 2012
@@ -34,7 +34,7 @@ import org.I0Itec.zkclient.ZkClient
  * to start up and shutdown a single Kafka node.
  */
 class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
-  this.logIdent = "Kafka Server " + config.brokerId + ", "
+  this.logIdent = "[Kafka Server " + config.brokerId + "], "
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1375360&r1=1375359&r2=1375360&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Tue
Aug 21 01:10:00 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala?rev=1375360&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ShutdownableThread.scala
Tue Aug 21 01:10:00 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+
+abstract class ShutdownableThread(val name: String)
+        extends Thread(name) with Logging {
+  this.setDaemon(false)
+  this.logIdent = "[" + name + "], "
+  val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+
+
+  def shutdown(): Unit = {
+    info("Shutting down")
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("Shutted down completed")
+  }
+
+    /**
+   * After calling shutdown(), use this API to wait until the shutdown is complete
+   */
+  def awaitShutdown(): Unit = shutdownLatch.await()
+
+  def doWork(): Unit
+
+  override def run(): Unit = {
+    info("Starting ")
+    try{
+      while(isRunning.get()){
+        doWork()
+      }
+    } catch{
+      case e: Throwable =>
+        if(isRunning.get())
+          error("Error due to ", e)
+    }
+    shutdownLatch.countDown()
+    info("Stopped ")
+  }
+}
\ No newline at end of file



Mime
View raw message