kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1396336 - in /incubator/kafka/branches/0.8: config/ core/lib/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/controller/ core/src/main/scala/kafka/log/ core/src/main/scala/kafka/metrics/ co...
Date Tue, 09 Oct 2012 21:28:21 GMT
Author: jjkoshy
Date: Tue Oct  9 21:28:20 2012
New Revision: 1396336

URL: http://svn.apache.org/viewvc?rev=1396336&view=rev
Log:
Upgrade to metrics jar to 3.x to pick up csv reporter fixes; KAFKA-542; patched by Joel Koshy;
reviewed by Neha Narkhede.

Added:
    incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.0-10ccc80c.jar   (with props)
    incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.0-10ccc80c.jar   (with props)
Modified:
    incubator/kafka/branches/0.8/config/server.properties
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
    incubator/kafka/branches/0.8/project/build/KafkaProject.scala

Modified: incubator/kafka/branches/0.8/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/server.properties?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/server.properties (original)
+++ incubator/kafka/branches/0.8/config/server.properties Tue Oct  9 21:28:20 2012
@@ -113,3 +113,10 @@ zk.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zk.connectiontimeout.ms=1000000
+
+# metrics reporter properties
+# kafka.metrics.polling.interval.secs=5
+# kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
+# kafka.csv.metrics.dir=kafka_metrics
+# kafka.csv.metrics.reporter.enabled=true
+

Added: incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.0-10ccc80c.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.0-10ccc80c.jar?rev=1396336&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.0-10ccc80c.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.0-10ccc80c.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.0-10ccc80c.jar?rev=1396336&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.0-10ccc80c.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Tue Oct
 9 21:28:20 2012
@@ -50,7 +50,7 @@ class Partition(val topic: String,
   newGauge(
     topic + "-" + partitionId + "UnderReplicated",
     new Gauge[Int] {
-      def value() = {
+      def getValue = {
         if (isUnderReplicated) 1 else 0
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Tue Oct  9 21:28:20 2012
@@ -686,7 +686,7 @@ private[kafka] class ZookeeperConsumerCo
       newGauge(
         config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
         new Gauge[Int] {
-          def value() = q.size
+          def getValue = q.size
         }
       )
     })

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
Tue Oct  9 21:28:20 2012
@@ -50,7 +50,7 @@ class KafkaController(val config : Kafka
   newGauge(
     "ActiveControllerCount",
     new Gauge[Int] {
-      def value() = if (isActive) 1 else 0
+      def getValue = if (isActive) 1 else 0
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Tue Oct  9 21:28:20
2012
@@ -128,10 +128,10 @@ private[kafka] class Log(val dir: File, 
   private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
 
   newGauge(name + "-" + "NumLogSegments",
-           new Gauge[Int] { def value() = numberOfSegments })
+           new Gauge[Int] { def getValue = numberOfSegments })
 
   newGauge(name + "-" + "LogEndOffset",
-           new Gauge[Long] { def value() = logEndOffset })
+           new Gauge[Long] { def getValue = logEndOffset })
 
   /* The name of this log */
   def name  = dir.getName()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
Tue Oct  9 21:28:20 2012
@@ -50,9 +50,10 @@ private class KafkaCSVMetricsReporter ex
         if (!csvDir.exists())
           csvDir.mkdirs()
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
-        if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false))
+        if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
+          initialized = true
           startReporter(metricsConfig.pollingIntervalSecs)
-        initialized = true
+        }
       }
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Tue
Oct  9 21:28:20 2012
@@ -92,7 +92,7 @@ class RequestChannel(val numProcessors: 
   newGauge(
     "RequestQueueSize",
     new Gauge[Int] {
-      def value() = requestQueue.size
+      def getValue = requestQueue.size
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Tue Oct  9 21:28:20 2012
@@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val thread
   newGauge(
     "ProducerQueueSize-" + getId,
     new Gauge[Int] {
-      def value() = queue.size
+      def getValue = queue.size
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Tue Oct  9 21:28:20 2012
@@ -165,7 +165,7 @@ class FetcherLagMetrics(name: (String, I
   newGauge(
     name._1 + "-" + name._2 + "-ConsumerLag",
     new Gauge[Long] {
-      def value() = lagVal.get
+      def getValue = lagVal.get
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue
Oct  9 21:28:20 2012
@@ -47,13 +47,13 @@ class ReplicaManager(val config: KafkaCo
   newGauge(
     "LeaderCount",
     new Gauge[Int] {
-      def value() = leaderPartitions.size
+      def getValue = leaderPartitions.size
     }
   )
   newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
-      def value() = {
+      def getValue = {
         leaderPartitionsLock synchronized {
           leaderPartitions.count(_.isUnderReplicated)
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Tue
Oct  9 21:28:20 2012
@@ -69,7 +69,7 @@ abstract class RequestPurgatory[T <: Del
   newGauge(
     "NumDelayedRequests",
     new Gauge[Int] {
-      def value() = expiredRequestReaper.unsatisfied.get()
+      def getValue = expiredRequestReaper.unsatisfied.get()
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
Tue Oct  9 21:28:20 2012
@@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite
     timer.time {
       clock.addMillis(1000)
     }
-    assertEquals(1, metric.count())
-    assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
-    assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
+    assertEquals(1, metric.getCount())
+    assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
+    assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
   }
 
   private class ManualClock extends Clock {
 
     private var ticksInNanos = 0L
 
-    override def tick() = {
+    override def getTick() = {
       ticksInNanos
     }
 
-    override def time() = {
+    override def getTime() = {
       TimeUnit.NANOSECONDS.toMillis(ticksInNanos)
     }
 

Modified: incubator/kafka/branches/0.8/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/project/build/KafkaProject.scala?rev=1396336&r1=1396335&r2=1396336&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/project/build/KafkaProject.scala (original)
+++ incubator/kafka/branches/0.8/project/build/KafkaProject.scala Tue Oct  9 21:28:20 2012
@@ -66,17 +66,42 @@ class KafkaProject(info: ProjectInfo) ex
         <scope>compile</scope>
       </dependency>
 
+    def metricsDeps =
+      <dependencies>
+        <dependency>
+          <groupId>com.yammer.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>3.0.0-10ccc80c</version>
+          <scope>compile</scope>
+        </dependency>
+        <dependency>
+          <groupId>com.yammer.metrics</groupId>
+          <artifactId>metrics-annotations</artifactId>
+          <version>3.0.0-10ccc80c</version>
+          <scope>compile</scope>
+        </dependency>
+      </dependencies>
+
     object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
       override def transform(node: Node): Seq[Node] = node match {
         case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
-          Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep :_*)
+          Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
+        }
+        case other => other
+      }
+    })
+
+    object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
+      override def transform(node: Node): Seq[Node] = node match {
+        case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
+          Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDeps:_*)
         }
         case other => other
       }
     })
 
     override def pomPostProcess(pom: Node): Node = {
-      ZkClientDepAdder(pom)
+      MetricsDepAdder(ZkClientDepAdder(pom))
     }
 
     override def artifactID = "kafka"
@@ -251,7 +276,6 @@ class KafkaProject(info: ProjectInfo) ex
   trait CoreDependencies {
     val log4j = "log4j" % "log4j" % "1.2.15"
     val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
-    val metricsCore = "com.yammer.metrics" % "metrics-core" % "latest.release"
     val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release"
   }
   



Mime
View raw message