kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1453; Add a channel queue jmx in Mirror Maker; patched by Guozhang Wang; reviewed by Jun Rao
Date Fri, 16 May 2014 04:26:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8d454f374 -> b4c1089fa


kafka-1453; Add a channel queue jmx in Mirror Maker;  patched by Guozhang Wang; reviewed by
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b4c1089f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b4c1089f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b4c1089f

Branch: refs/heads/trunk
Commit: b4c1089fa963ec204dc350693a6a69dc929c99c3
Parents: 8d454f3
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Thu May 15 21:26:41 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu May 15 21:26:41 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 59 ++++++++++++++++----
 1 file changed, 48 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b4c1089f/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 26730c4..19df3d5 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,7 +17,7 @@
 
 package kafka.tools
 
-import kafka.utils.{Utils, CommandLineUtils, Logging}
+import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging}
 import kafka.consumer._
 import kafka.serializer._
 import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer}
@@ -26,9 +26,11 @@ import org.apache.kafka.clients.producer.ProducerRecord
 import scala.collection.mutable.ListBuffer
 import scala.collection.JavaConversions._
 
-import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch}
+import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch}
 
 import joptsimple.OptionParser
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
 object MirrorMaker extends Logging {
 
@@ -73,7 +75,8 @@ object MirrorMaker extends Logging {
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1)
 
-    val bufferSizeOpt =  parser.accepts("queue.size", "Number of messages that are buffered
between the consumer and producer")
+    val bufferSizeOpt =  parser.accepts("queue.size",
+      "Number of messages that are buffered between the consumer and producer")
       .withRequiredArg()
       .describedAs("Queue size in terms of number of messages")
       .ofType(classOf[java.lang.Integer])
@@ -114,7 +117,7 @@ object MirrorMaker extends Logging {
     val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
 
     // create data channel
-    val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize)
+    val mirrorDataChannel = new DataChannel(bufferSize)
 
     // create producer threads
     val producers = (1 to numProducers).map(_ => {
@@ -178,11 +181,46 @@ object MirrorMaker extends Logging {
     info("Kafka mirror maker shutdown successfully")
   }
 
+  class DataChannel(capacity: Int) extends KafkaMetricsGroup {
+
+    val queue = new ArrayBlockingQueue[ProducerRecord](capacity)
+
+    newGauge(
+      "MirrorMaker-DataChannel-Size",
+      new Gauge[Int] {
+        def value = queue.size
+      }
+    )
+
+    private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.MILLISECONDS)
+    private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.MILLISECONDS)
+
+
+    def put(record: ProducerRecord) {
+      var putSucceed = false
+      while (!putSucceed) {
+        val startPutTime = SystemTime.milliseconds
+        putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
+        waitPut.mark(SystemTime.milliseconds - startPutTime)
+      }
+    }
+
+    def take(): ProducerRecord = {
+      var data: ProducerRecord = null
+      while (data == null) {
+        val startTakeTime = SystemTime.milliseconds
+        data = queue.poll(500, TimeUnit.MILLISECONDS)
+        waitTake.mark(SystemTime.milliseconds - startTakeTime)
+      }
+      data
+    }
+  }
+
   class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
-                          mirrorDataChannel: BlockingQueue[ProducerRecord],
-                          producers: Seq[BaseProducer],
-                          threadId: Int)
-          extends Thread with Logging {
+                       mirrorDataChannel: DataChannel,
+                       producers: Seq[BaseProducer],
+                       threadId: Int)
+          extends Thread with Logging with KafkaMetricsGroup {
 
     private val shutdownLatch = new CountDownLatch(1)
     private val threadName = "mirrormaker-consumer-" + threadId
@@ -226,9 +264,9 @@ object MirrorMaker extends Logging {
     }
   }
 
-  class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord],
+  class ProducerThread (val dataChannel: DataChannel,
                         val producer: BaseProducer,
-                        val threadId: Int) extends Thread with Logging {
+                        val threadId: Int) extends Thread with Logging with KafkaMetricsGroup
{
     private val threadName = "mirrormaker-producer-" + threadId
     private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
     this.logIdent = "[%s] ".format(threadName)
@@ -241,7 +279,6 @@ object MirrorMaker extends Logging {
         while (true) {
           val data: ProducerRecord = dataChannel.take
           trace("Sending message with value size %d".format(data.value().size))
-
           if(data eq shutdownMessage) {
             info("Received shutdown message")
             return


Mime
View raw message