kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1204768 - in /incubator/kafka/trunk/core/src/main/scala/kafka/producer/async: AsyncProducer.scala AsyncProducerStats.scala AsyncProducerStatsMBean.scala
Date Tue, 22 Nov 2011 00:58:40 GMT
Author: junrao
Date: Tue Nov 22 00:58:39 2011
New Revision: 1204768

URL: http://svn.apache.org/viewvc?rev=1204768&view=rev
Log:
AsyncProducerStats is not a singleton; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-207

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala?rev=1204768&r1=1204767&r2=1204768&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducer.scala Tue
Nov 22 00:58:39 2011
@@ -23,8 +23,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.log4j.{Level, Logger}
 import kafka.api.ProducerRequest
 import kafka.serializer.Encoder
-import java.lang.management.ManagementFactory
-import javax.management.ObjectName
 import java.util.{Random, Properties}
 import kafka.producer.{ProducerConfig, SyncProducer}
 
@@ -32,6 +30,7 @@ object AsyncProducer {
   val Shutdown = new Object
   val Random = new Random
   val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats"
+  val ProducerQueueSizeMBeanName = "kafka.producer.Producer:type=AsyncProducerQueueSizeStats"
 }
 
 private[kafka] class AsyncProducer[T](config: AsyncProducerConfig,
@@ -49,22 +48,14 @@ private[kafka] class AsyncProducer[T](co
     eventHandler.init(eventHandlerProps)
   if(cbkHandler != null)
     cbkHandler.init(cbkHandlerProps)
-  private val sendThread = new ProducerSendThread("ProducerSendThread-" + AsyncProducer.Random.nextInt,
queue,
+  private val asyncProducerID = AsyncProducer.Random.nextInt
+  private val sendThread = new ProducerSendThread("ProducerSendThread-" + asyncProducerID,
queue,
     serializer, producer,
     if(eventHandler != null) eventHandler else new DefaultEventHandler[T](new ProducerConfig(config.props),
cbkHandler),
     cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown)
   sendThread.setDaemon(false)
-
-  val asyncProducerStats = new AsyncProducerStats[T](queue)
-  val mbs = ManagementFactory.getPlatformMBeanServer
-  try {
-    val objName = new ObjectName(AsyncProducer.ProducerMBeanName)
-    if(mbs.isRegistered(objName))
-      mbs.unregisterMBean(objName)
-    mbs.registerMBean(asyncProducerStats, objName)
-  }catch {
-    case e: Exception => logger.warn("can't register AsyncProducerStats")
-  }
+  Utils.swallow(logger.warn, Utils.registerMBean(
+    new AsyncProducerQueueSizeStats[T](queue), AsyncProducer.ProducerQueueSizeMBeanName +
"-" + asyncProducerID))
 
   def this(config: AsyncProducerConfig) {
     this(config,
@@ -81,7 +72,7 @@ private[kafka] class AsyncProducer[T](co
   def send(topic: String, event: T) { send(topic, event, ProducerRequest.RandomPartition)
}
 
   def send(topic: String, event: T, partition:Int) {
-    asyncProducerStats.recordEvent
+    AsyncProducerStats.recordEvent
 
     if(closed.get)
       throw new QueueClosedException("Attempt to add event to a closed queue.")
@@ -116,7 +107,7 @@ private[kafka] class AsyncProducer[T](co
       cbkHandler.afterEnqueue(data, added)
 
     if(!added) {
-      asyncProducerStats.recordDroppedEvents
+      AsyncProducerStats.recordDroppedEvents
       logger.error("Event queue is full of unsent messages, could not send event: " + event.toString)
       throw new QueueFullException("Event queue is full of unsent messages, could not send
event: " + event.toString)
     }else {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala?rev=1204768&r1=1204767&r2=1204768&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
Tue Nov 22 00:58:39 2011
@@ -18,13 +18,15 @@
 package kafka.producer.async
 
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, ScheduledThreadPoolExecutor, ScheduledExecutorService,
BlockingQueue}
+import java.util.concurrent.BlockingQueue
+import org.apache.log4j.Logger
+import kafka.utils.Utils
 
-class AsyncProducerStats[T](queue: BlockingQueue[QueueItem[T]]) extends AsyncProducerStatsMBean
{
+class AsyncProducerStats extends AsyncProducerStatsMBean {
   val droppedEvents = new AtomicInteger(0)
   val numEvents = new AtomicInteger(0)
 
-  def getAsyncProducerQueueSize: Int = queue.size
+  def getAsyncProducerEvents: Int = numEvents.get
 
   def getAsyncProducerDroppedEvents: Int = droppedEvents.get
 
@@ -32,3 +34,17 @@ class AsyncProducerStats[T](queue: Block
 
   def recordEvent = numEvents.getAndAdd(1)
 }
+
+class AsyncProducerQueueSizeStats[T](private val queue: BlockingQueue[QueueItem[T]]) extends
AsyncProducerQueueSizeStatsMBean {
+  def getAsyncProducerQueueSize: Int = queue.size
+}
+
+object AsyncProducerStats {
+  private val logger = Logger.getLogger(getClass())
+  private val stats = new AsyncProducerStats
+  Utils.swallow(logger.warn, Utils.registerMBean(stats, AsyncProducer.ProducerMBeanName))
+
+  def recordDroppedEvents = stats.recordDroppedEvents
+
+  def recordEvent = stats.recordEvent
+}

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala?rev=1204768&r1=1204767&r2=1204768&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
Tue Nov 22 00:58:39 2011
@@ -18,6 +18,10 @@
 package kafka.producer.async
 
 trait AsyncProducerStatsMBean {
-  def getAsyncProducerQueueSize: Int
+  def getAsyncProducerEvents: Int
   def getAsyncProducerDroppedEvents: Int
 }
+
+trait AsyncProducerQueueSizeStatsMBean {
+  def getAsyncProducerQueueSize: Int
+}



Mime
View raw message