kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1401138 - in /incubator/kafka/branches/0.8: config/ core/lib/ core/src/main/scala/kafka/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/message/ core/src/main/s...
Date Tue, 23 Oct 2012 01:26:10 GMT
Author: nehanarkhede
Date: Tue Oct 23 01:26:08 2012
New Revision: 1401138

URL: http://svn.apache.org/viewvc?rev=1401138&view=rev
Log:
KAFKA-541 Move to metrics csv reporter for system tests; patched by Yang Ye; reviewed by Neha, Jun and Joel

Added:
    incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.1.jar   (with props)
    incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.1.jar   (with props)
Removed:
    incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.0-10ccc80c.jar
    incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.0-10ccc80c.jar
Modified:
    incubator/kafka/branches/0.8/config/server.properties
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
    incubator/kafka/branches/0.8/project/build/KafkaProject.scala
    incubator/kafka/branches/0.8/system_test/metrics.json
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/metrics.py

Modified: incubator/kafka/branches/0.8/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/config/server.properties?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/config/server.properties (original)
+++ incubator/kafka/branches/0.8/config/server.properties Tue Oct 23 01:26:08 2012
@@ -115,8 +115,8 @@ zk.connect=localhost:2181
 zk.connectiontimeout.ms=1000000
 
 # metrics reporter properties
-# kafka.metrics.polling.interval.secs=5
-# kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
-# kafka.csv.metrics.dir=kafka_metrics
-# kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
+kafka.csv.metrics.dir=kafka_metrics
+kafka.csv.metrics.reporter.enabled=true
 

Added: incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.1.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.1.jar?rev=1401138&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/branches/0.8/core/lib/metrics-annotation-3.0.1.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.1.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.1.jar?rev=1401138&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/branches/0.8/core/lib/metrics-core-3.0.1.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Tue Oct 23 01:26:08 2012
@@ -17,7 +17,8 @@
 
 package kafka
 
-import metrics.{KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
+
+import metrics.KafkaCSVMetricsReporter
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
 
@@ -32,15 +33,7 @@ object Kafka extends Logging {
     try {
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
-      val verifiableProps = serverConfig.props
-      val metricsConfig = new KafkaMetricsConfig(verifiableProps)
-      metricsConfig.reporters.foreach(reporterType => {
-        val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
-        reporter.init(verifiableProps)
-        if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
-          Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
-      })
-
+      KafkaCSVMetricsReporter.startCSVMetricReporter(serverConfig.props)
       val kafkaServerStartble = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Tue Oct 23 01:26:08 2012
@@ -18,8 +18,7 @@
 package kafka.admin
 
 import java.util.Random
-import kafka.api.{TopicMetadata, PartitionMetadata, TopicMetadataRequest, TopicMetadataResponse}
-import kafka.common._
+import kafka.api.{TopicMetadata, PartitionMetadata}
 import kafka.cluster.Broker
 import kafka.utils.{Logging, Utils, ZkUtils}
 import org.I0Itec.zkclient.ZkClient

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala Tue Oct 23 01:26:08 2012
@@ -19,7 +19,7 @@ package kafka.admin
 import joptsimple.OptionParser
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException}
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
 
 object PreferredReplicaLeaderElectionCommand extends Logging {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala Tue Oct 23 01:26:08 2012
@@ -19,7 +19,6 @@ package kafka.api
 
 import kafka.common.ErrorMapping
 import java.nio.ByteBuffer
-import kafka.utils.Utils
 import kafka.api.ApiUtils._
 import collection.mutable.HashMap
 import collection.Map

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Tue Oct 23 01:26:08 2012
@@ -18,7 +18,6 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.Utils
 import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala Tue Oct 23 01:26:08 2012
@@ -19,7 +19,6 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import kafka.common.{ErrorMapping, TopicAndPartition}
-import kafka.utils.Utils
 import kafka.api.ApiUtils._
 
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Tue Oct 23 01:26:08 2012
@@ -19,7 +19,6 @@ package kafka.api
 
 import java.nio._
 import kafka.message._
-import kafka.utils._
 import scala.collection.Map
 import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Tue Oct 23 01:26:08 2012
@@ -18,7 +18,6 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.Utils
 import scala.collection.Map
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.api.ApiUtils._

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala Tue Oct 23 01:26:08 2012
@@ -18,7 +18,6 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.Utils
 import collection.mutable.HashMap
 import collection.mutable.Map
 import kafka.common.ErrorMapping

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Oct 23 01:26:08 2012
@@ -17,7 +17,6 @@
 
 package kafka.consumer
 
-import scala.collection.mutable._
 import scala.collection.JavaConversions._
 import org.I0Itec.zkclient._
 import joptsimple._
@@ -25,9 +24,10 @@ import java.util.Properties
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
-import kafka.utils.{Utils, Logging, ZkUtils, CommandLineUtils}
-import kafka.utils.ZKStringSerializer
 import kafka.serializer.StringDecoder
+import kafka.utils._
+import kafka.metrics.KafkaCSVMetricsReporter
+
 
 /**
  * Consumer that dumps messages out to standard out.
@@ -107,6 +107,13 @@ object ConsoleConsumer extends Logging {
             .ofType(classOf[java.lang.Integer])
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
             "skip it instead of halt.")
+    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
+    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
+            "set, the csv metrics will be outputed here")
+      .withRequiredArg
+      .describedAs("metrics dictory")
+      .ofType(classOf[java.lang.String])
+
 
     val options: OptionSet = tryParse(parser, args)
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
@@ -122,6 +129,20 @@ object ConsoleConsumer extends Logging {
     else
       new Whitelist(topicArg)
 
+    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
+    if (csvMetricsReporterEnabled) {
+      val csvReporterProps = new Properties()
+      csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
+      csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
+      if (options.has(metricsDirectoryOpt))
+        csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
+      else
+        csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
+      csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
+      val verifiableProps = new VerifiableProperties(csvReporterProps)
+      KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
+    }
+
     val props = new Properties()
     props.put("groupid", options.valueOf(groupIdOpt))
     props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Tue Oct 23 01:26:08 2012
@@ -45,7 +45,7 @@ object ConsumerConfig {
   val DefaultClientId = ""
 }
 
-class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) {
+class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
   import ConsumerConfig._
 
   def this(originalProps: Properties) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Tue Oct 23 01:26:08 2012
@@ -26,7 +26,6 @@ import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
-import kafka.utils.Utils._
 import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Tue Oct 23 01:26:08 2012
@@ -33,9 +33,8 @@ import kafka.utils.ZkUtils._
 import kafka.common._
 import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
-import kafka.metrics.KafkaMetricsGroup
-import kafka.utils.Utils._
 import kafka.api.OffsetRequest
+import kafka.metrics._
 
 
 /**
@@ -118,6 +117,8 @@ private[kafka] class ZookeeperConsumerCo
       config.autoCommitIntervalMs, false)
   }
 
+  KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
+
   def this(config: ConsumerConfig) = this(config, true)
 
   def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Tue Oct 23 01:26:08 2012
@@ -21,11 +21,10 @@ import scala.reflect.BeanProperty
 import kafka.utils.Logging
 import java.nio.ByteBuffer
 import java.nio.channels._
-import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, IOException}
-import java.util.zip._
+import java.io.{InputStream, ByteArrayOutputStream, DataOutputStream}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.IteratorTemplate
-import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException}
+import kafka.common.InvalidMessageSizeException
 
 object ByteBufferMessageSet {
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala Tue Oct 23 01:26:08 2012
@@ -18,14 +18,9 @@
 package kafka.message
 
 import java.io.OutputStream
-import java.io.ByteArrayOutputStream
 import java.util.zip.GZIPOutputStream
 import java.util.zip.GZIPInputStream
-import java.io.IOException
 import java.io.InputStream
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
-import kafka.utils._
 
 object CompressionFactory {
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala Tue Oct 23 01:26:08 2012
@@ -24,11 +24,33 @@ import com.yammer.metrics.Metrics
 import java.io.File
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
-import kafka.utils.{VerifiableProperties, Logging}
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.utils.{Utils, VerifiableProperties, Logging}
 
 
 private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
 
+object KafkaCSVMetricsReporter {
+  val CSVReporterStarted: AtomicBoolean = new AtomicBoolean(false)
+
+  def startCSVMetricReporter (verifiableProps: VerifiableProperties) {
+    CSVReporterStarted synchronized {
+      if (CSVReporterStarted.get() == false) {
+        val metricsConfig = new KafkaMetricsConfig(verifiableProps)
+        if(metricsConfig.reporters.size > 0) {
+          metricsConfig.reporters.foreach(reporterType => {
+            val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
+            reporter.init(verifiableProps)
+            if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
+              Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+          })
+          CSVReporterStarted.set(true)
+        }
+      }
+    }
+  }
+}
+
 private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
                               with KafkaCSVMetricsReporterMBean
                               with Logging {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala Tue Oct 23 01:26:08 2012
@@ -40,14 +40,15 @@ trait KafkaMetricsGroup extends Logging 
   }
 
   def newGauge[T](name: String, metric: Gauge[T]) =
-    Metrics.newGauge(metricName(name), metric)
+    Metrics.defaultRegistry().newGauge(metricName(name), metric)
 
   def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
-    Metrics.newMeter(metricName(name), eventType, timeUnit)
+    Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit)
 
-  def newHistogram(name: String, biased: Boolean = false) = Metrics.newHistogram(metricName(name), biased)
+  def newHistogram(name: String, biased: Boolean = false) =
+    Metrics.defaultRegistry().newHistogram(metricName(name), biased)
 
   def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
-    Metrics.newTimer(metricName(name), durationUnit, rateUnit)
+    Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
 
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Tue Oct 23 01:26:08 2012
@@ -39,22 +39,22 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeNs: Long) {
-    var dequeueTimeNs = -1L
-    var apiLocalCompleteTimeNs = -1L
-    var responseCompleteTimeNs = -1L
+  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) {
+    var dequeueTimeMs = -1L
+    var apiLocalCompleteTimeMs = -1L
+    var responseCompleteTimeMs = -1L
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer.rewind()
     trace("Received request: %s".format(requestObj))
 
     def updateRequestMetrics() {
-      val endTimeNs = SystemTime.nanoseconds
-      val queueTime = (dequeueTimeNs - startTimeNs).max(0L)
-      val apiLocalTime = (apiLocalCompleteTimeNs - dequeueTimeNs).max(0L)
-      val apiRemoteTime = (responseCompleteTimeNs - apiLocalCompleteTimeNs).max(0L)
-      val responseSendTime = (endTimeNs - responseCompleteTimeNs).max(0L)
-      val totalTime = endTimeNs - startTimeNs
+      val endTimeMs = SystemTime.milliseconds
+      val queueTime = (dequeueTimeMs - startTimeMs).max(0L)
+      val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L)
+      val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
+      val responseSendTime = (endTimeMs - responseCompleteTimeMs).max(0L)
+      val totalTime = endTimeMs - startTimeMs
       var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
       if (requestId == RequestKeys.FetchKey) {
         val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
@@ -76,7 +76,7 @@ object RequestChannel extends Logging {
   }
   
   case class Response(processor: Int, request: Request, responseSend: Send) {
-    request.responseCompleteTimeNs = SystemTime.nanoseconds
+    request.responseCompleteTimeMs = SystemTime.milliseconds
 
     def this(request: Request, send: Send) =
       this(request.processor, request, send)
@@ -133,13 +133,13 @@ object RequestMetrics {
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
   val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
   // time a request spent in a request queue
-  val queueTimeHist = newHistogram(name + "-QueueTimeNs")
+  val queueTimeHist = newHistogram(name + "-QueueTimeMs")
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(name + "-LocalTimeNs")
+  val localTimeHist = newHistogram(name + "-LocalTimeMs")
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
-  val remoteTimeHist = newHistogram(name + "-RemoteTimeNs")
+  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeNs")
-  val totalTimeHist = newHistogram(name + "-TotalTimeNs")
+  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
+  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Tue Oct 23 01:26:08 2012
@@ -295,7 +295,7 @@ private[kafka] class Processor(val id: I
     if(read < 0) {
       close(key)
     } else if(receive.complete) {
-      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeNs = time.nanoseconds)
+      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds)
       requestChannel.sendRequest(req)
       trace("Recieved request, sending for processing by handler: " + req)
       key.attach(null)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Tue Oct 23 01:26:08 2012
@@ -17,11 +17,10 @@
 package kafka.producer
 
 import collection.mutable.HashMap
-import kafka.api.{TopicMetadataRequest, TopicMetadata}
+import kafka.api.TopicMetadata
 import kafka.common.KafkaException
-import kafka.utils.{Logging, Utils}
+import kafka.utils.Logging
 import kafka.common.ErrorMapping
-import kafka.cluster.Broker
 import kafka.client.ClientUtils
 
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Tue Oct 23 01:26:08 2012
@@ -23,7 +23,8 @@ import java.util.concurrent.{TimeUnit, L
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{QueueFullException, InvalidConfigException}
-import kafka.metrics.KafkaMetricsGroup
+import kafka.metrics._
+
 
 class Producer[K,V](config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V]) // for testing only
@@ -48,6 +49,8 @@ extends Logging {
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
+  KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
+
   def this(config: ProducerConfig) =
     this(config,
          new DefaultEventHandler[K,V](config,

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Tue Oct 23 01:26:08 2012
@@ -21,7 +21,7 @@ import kafka.cluster.Broker
 import java.util.Properties
 import collection.mutable.HashMap
 import java.lang.Object
-import kafka.utils.{Utils, Logging}
+import kafka.utils.Logging
 import kafka.api.TopicMetadata
 import kafka.common.UnavailableProducerException
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Tue Oct 23 01:26:08 2012
@@ -69,7 +69,6 @@ class SyncProducer(val config: SyncProdu
   private def doSend(request: RequestOrResponse): Receive = {
     lock synchronized {
       verifyRequest(request)
-      val startTime = SystemTime.nanoseconds
       getOrMakeConnection()
 
       var response: Receive = null

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Oct 23 01:26:08 2012
@@ -61,7 +61,7 @@ class KafkaApis(val requestChannel: Requ
       case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
       case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
     }
-    request.apiLocalCompleteTimeNs = SystemTime.nanoseconds
+    request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Tue Oct 23 01:26:08 2012
@@ -21,8 +21,7 @@ import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
-import scala.collection._
+import kafka.utils.{VerifiableProperties, ZKConfig}
 
 /**
  * Configuration settings for the kafka server

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Tue Oct 23 01:26:08 2012
@@ -35,7 +35,7 @@ class KafkaRequestHandler(id: Int, broke
         trace("receives shut down command, shut down".format(brokerId, id))
         return
       }
-      req.dequeueTimeNs = SystemTime.nanoseconds
+      req.dequeueTimeMs = SystemTime.milliseconds
       debug("handles request " + req)
       apis.handle(req)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Tue Oct 23 01:26:08 2012
@@ -22,7 +22,6 @@ import joptsimple._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import kafka.consumer.SimpleConsumer
-import collection.mutable.Map
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.TopicAndPartition
 import scala.collection._

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Tue Oct 23 01:26:08 2012
@@ -22,9 +22,8 @@ import java.util.concurrent.{Executors, 
 import java.util.Properties
 import kafka.producer.{ProducerData, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Logging, ZkUtils}
+import kafka.utils.{Logging, ZkUtils}
 import kafka.api.OffsetRequest
-import org.I0Itec.zkclient._
 import kafka.message.{CompressionCodec, Message}
 
 object ReplayLogProducer extends Logging {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Oct 23 01:26:08 2012
@@ -111,7 +111,7 @@ class TopicMetadataTest extends JUnit3Su
 
     // call the API (to be tested) to get metadata
     apis.handleTopicMetadataRequest(new RequestChannel.Request
-      (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeNs=1))
+      (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeMs=1))
     val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
     
     // check assertions

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Tue Oct 23 01:26:08 2012
@@ -21,9 +21,7 @@ import java.io._
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.common.OffsetOutOfRangeException
-import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
-import kafka.admin.CreateTopicCommand
 import kafka.server.KafkaConfig
 import kafka.utils._
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue Oct 23 01:26:08 2012
@@ -30,7 +30,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition, UnknownTopicOrPartitionException}
+import kafka.common.{ErrorMapping, TopicAndPartition}
 
 object LogOffsetTest {
   val random = new Random()  

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala Tue Oct 23 01:26:08 2012
@@ -2,7 +2,7 @@ package kafka.log
 
 import junit.framework.Assert._
 import java.util.concurrent.atomic._
-import org.junit.{Test, Before, After}
+import org.junit.{Test, After}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
 import kafka.message._

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Oct 23 01:26:08 2012
@@ -24,7 +24,7 @@ import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message._
-import kafka.common.{MessageSizeTooLargeException, KafkaException, OffsetOutOfRangeException}
+import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
 import kafka.utils._
 import scala.Some
 import kafka.server.KafkaConfig

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Tue Oct 23 01:26:08 2012
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.Atomi
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.utils.TestUtils
-import kafka.common.InvalidMessageSizeException
 
 class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala Tue Oct 23 01:26:08 2012
@@ -18,9 +18,7 @@
 package kafka.message
 
 import java.io.ByteArrayOutputStream
-import java.util.concurrent.atomic.AtomicLong
 import scala.collection._
-import kafka.utils.TestUtils
 import org.scalatest.junit.JUnitSuite
 import org.junit._
 import junit.framework.Assert._

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala Tue Oct 23 01:26:08 2012
@@ -23,9 +23,6 @@ import scala.collection._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.runners.Parameterized.Parameters
 import kafka.utils.TestUtils
 import kafka.utils.Utils
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Oct 23 01:26:08 2012
@@ -19,7 +19,6 @@ package kafka.producer
 
 import java.util.{LinkedList, Properties}
 import java.util.concurrent.LinkedBlockingQueue
-import java.io.IOException
 import junit.framework.Assert._
 import org.easymock.EasyMock
 import org.junit.Test

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Oct 23 01:26:08 2012
@@ -22,7 +22,7 @@ import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
-import kafka.message.{Message, MessageSet, ByteBufferMessageSet}
+import kafka.message.Message
 import kafka.producer.{ProducerConfig, ProducerData, Producer}
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Tue Oct 23 01:26:08 2012
@@ -99,7 +99,7 @@ class SimpleFetchTest extends JUnit3Suit
     val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
 
     // send the request
-    apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeNs=1))
+    apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeMs=1))
 
     // make sure the log only reads bytes between 0->HW (5)
     EasyMock.verify(log)
@@ -123,7 +123,7 @@ class SimpleFetchTest extends JUnit3Suit
     apis.handleOffsetRequest(new RequestChannel.Request(processor = 0,
                                                         requestKey = 5,
                                                         buffer = offsetRequestBB,
-                                                        startTimeNs = 1))
+                                                        startTimeMs = 1))
     val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
     val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
     EasyMock.verify(replicaManager)
@@ -198,7 +198,7 @@ class SimpleFetchTest extends JUnit3Suit
     val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch)
 
     // send the request
-    apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeNs=1))
+    apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeMs=1))
 
     /**
      * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after
@@ -226,7 +226,7 @@ class SimpleFetchTest extends JUnit3Suit
     apis.handleOffsetRequest(new RequestChannel.Request(processor = 1,
                                                         requestKey = 5,
                                                         buffer = offsetRequestBB,
-                                                        startTimeNs = 1))
+                                                        startTimeMs = 1))
     val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
     val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
     EasyMock.verify(replicaManager)

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Tue Oct 23 01:26:08 2012
@@ -24,7 +24,9 @@ import org.apache.log4j.Logger
 import kafka.message.{CompressionCodec, Message}
 import java.text.SimpleDateFormat
 import java.util.{Random, Properties}
-import kafka.utils.Logging
+import kafka.utils.{VerifiableProperties, Logging}
+import kafka.metrics.{KafkaCSVMetricsReporter, KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
+
 
 /**
  * Load test for the producer
@@ -113,6 +115,12 @@ object ProducerPerformance extends Loggi
       .describedAs("initial message id")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(0)
+    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
+    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
+            "set, the csv metrics will be outputed here")
+      .withRequiredArg
+      .describedAs("metrics dictory")
+      .ofType(classOf[java.lang.String])
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
@@ -140,6 +148,21 @@ object ProducerPerformance extends Loggi
     val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()
     val produceRequestRequiredAcks = options.valueOf(produceRequestRequiredAcksOpt).intValue()
 
+    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
+
+    if (csvMetricsReporterEnabled) {
+      val props = new Properties()
+      props.put("kafka.metrics.polling.interval.secs", "5")
+      props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
+      if (options.has(metricsDirectoryOpt))
+        props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
+      else
+        props.put("kafka.csv.metrics.dir", "kafka_metrics")
+      props.put("kafka.csv.metrics.reporter.enabled", "true")
+      val verifiableProps = new VerifiableProperties(props)
+      KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
+    }
+
     // override necessary flags in seqIdMode
     if (seqIdMode) { 
       batchSize = 1

Modified: incubator/kafka/branches/0.8/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/project/build/KafkaProject.scala?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/project/build/KafkaProject.scala (original)
+++ incubator/kafka/branches/0.8/project/build/KafkaProject.scala Tue Oct 23 01:26:08 2012
@@ -71,13 +71,13 @@ class KafkaProject(info: ProjectInfo) ex
         <dependency>
           <groupId>com.yammer.metrics</groupId>
           <artifactId>metrics-core</artifactId>
-          <version>3.0.0-10ccc80c</version>
+          <version>3.0.1</version>
           <scope>compile</scope>
         </dependency>
         <dependency>
           <groupId>com.yammer.metrics</groupId>
           <artifactId>metrics-annotations</artifactId>
-          <version>3.0.0-10ccc80c</version>
+          <version>3.0.1</version>
           <scope>compile</scope>
         </dependency>
       </dependencies>

Modified: incubator/kafka/branches/0.8/system_test/metrics.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/metrics.json?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/metrics.json (original)
+++ incubator/kafka/branches/0.8/system_test/metrics.json Tue Oct 23 01:26:08 2012
@@ -11,14 +11,14 @@
                },
                { 
                   "graph_name": "Produce-Request-Time",
-                  "y_label": "ns,ns",
-                  "bean_name": "kafka.network:type=RequestMetrics,name=Produce-TotalTimeNs",
+                  "y_label": "ms,ms",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Produce-TotalTimeMs",
                   "attributes": "Mean,99thPercentile"
                },
                { 
                   "graph_name": "Produce-Request-Remote-Time",
-                  "y_label": "ns,ns",
-                  "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RemoteTimeNs",
+                  "y_label": "ms,ms",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RemoteTimeMs",
                   "attributes": "Mean,99thPercentile"
                },
                { 
@@ -29,14 +29,14 @@
                },
                { 
                   "graph_name": "Fetch-Consumer-Request-Time",
-                  "y_label": "ns,ns",
-                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-TotalTimeNs",
+                  "y_label": "ms,ms",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-TotalTimeMs",
                   "attributes": "Mean,99thPercentile"
                },
                { 
                   "graph_name": "Fetch-Consumer-Request-Remote-Time",
-                  "y_label": "ns,ns",
-                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RemoteTimeNs",
+                  "y_label": "ms,ms",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RemoteTimeMs",
                   "attributes": "Mean,99thPercentile"
                },
                { 
@@ -47,14 +47,14 @@
                },
                { 
                   "graph_name": "Fetch-Follower-Request-Time",
-                  "y_label": "ns,ns",
-                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-TotalTimeNs",
+                  "y_label": "ms,ms",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-TotalTimeMs",
                   "attributes": "Mean,99thPercentile"
                },
                { 
                   "graph_name": "Fetch-Follower-Request-Remote-Time",
-                  "y_label": "ns,ns",
-                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RemoteTimeNs",
+                  "y_label": "ms,ms",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RemoteTimeMs",
                   "attributes": "Mean,99thPercentile"
                },
                { 
@@ -90,13 +90,13 @@
                {
                   "graph_name": "ControllerLeaderElectionRateAndTime",
                   "y_label": "elections-per-sec,ms,ms",
-                  "bean_name": "kafka.server:type=ControllerStat,name=LeaderElectionRateAndTimeMs",
+                  "bean_name": "kafka.controller:type=ControllerStat,name=LeaderElectionRateAndTimeMs",
                   "attributes": "OneMinuteRate,Mean,99thPercentile"
                },
                {
                   "graph_name": "LogFlushRateAndTime",
                   "y_label": "flushes-per-sec,ms,ms",
-                  "bean_name": "kafka.message:type=LogFlushStats,name=LogFlushRateAndTimeMs",
+                  "bean_name": "kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs",
                   "attributes": "OneMinuteRate,Mean,99thPercentile"
                },
                {

Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Tue Oct 23 01:26:08 2012
@@ -366,8 +366,13 @@ def generate_overriden_props_files(tests
                         logger.error("Unknown cluster name: " + clusterName)
                         sys.exit(1)
 
+                    addedCSVConfig = {}
+                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") 
+                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
+                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
+                    addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true"
                     copy_file_with_dict_values(cfgTemplatePathname + "/server.properties",
-                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
 
                 elif ( clusterCfg["role"] == "zookeeper"):
                     if clusterCfg["cluster_name"] == "source":
@@ -391,6 +396,33 @@ def generate_overriden_props_files(tests
                     tcCfg["zk.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
                     copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties",
                         cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None)
+                
+                elif ( clusterCfg["role"] == "producer_performance" ):
+                    copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
+
+                elif ( clusterCfg["role"] == "console_consumer" ):
+                    copy_file_with_dict_values(cfgTemplatePathname + "/console_consumer.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, None)
+                
+                elif ( clusterCfg["role"] == "producer" ):
+                    addedCSVConfig = {}
+                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "producer", clusterCfg["entity_id"], "metrics") 
+                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
+                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
+                    addedCSVConfig["kfka.metrics.polling.interval.secsafka.csv.metrics.reporter.enabled"] = "true"
+                    copy_file_with_dict_values(cfgTemplatePathname + "/producer.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
+
+
+                elif ( clusterCfg["role"] == "consumer" ):
+                    addedCSVConfig = {}
+                    addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "consumer", clusterCfg["entity_id"], "metrics") 
+                    addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" 
+                    addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" 
+                    addedCSVConfig["kfka.metrics.polling.interval.secsafka.csv.metrics.reporter.enabled"] = "true"
+                    copy_file_with_dict_values(cfgTemplatePathname + "/consumer.properties",
+                        cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg, addedCSVConfig)
                 else:
                     logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d)
 
@@ -669,10 +701,6 @@ def start_entity_in_background(systemTes
             elif role == "mirror_maker":
                 testcaseEnv.entityMirrorMakerParentPidDict[entityId] = tokens[1]
 
-    time.sleep(1)
-    if role != "mirror_maker":
-        metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
-
 
 def start_console_consumer(systemTestEnv, testcaseEnv):
 
@@ -729,6 +757,8 @@ def start_console_consumer(systemTestEnv
                    "--zookeeper " + zkConnectStr,
                    "--topic " + topic,
                    "--consumer-timeout-ms " + timeoutMs,
+                   "--csv-reporter-enable",
+                   "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "metrics"),
                    formatterOption,
                    "--from-beginning ",
                    " >> " + consumerLogPathName,
@@ -738,8 +768,6 @@ def start_console_consumer(systemTestEnv
 
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
         system_test_utils.async_sys_call(cmdStr)
-        time.sleep(2)
-        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
         pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'"
         logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
@@ -780,8 +808,6 @@ def start_producer_performance(systemTes
         logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
         time.sleep(1)
         testcaseEnv.lock.release()
-        time.sleep(1)
-        metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
 def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client):
     host              = producerConfig["hostname"]
@@ -850,6 +876,8 @@ def start_producer_in_thread(testcaseEnv
                        "--compression-codec " + compCodec,
                        "--message-size " + messageSize,
                        "--request-num-acks " + requestNumAcks,
+                       "--csv-reporter-enabled",
+                       "--metrics-dir " + get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "metrics"),
                        boolArgumentsStr,
                        " >> " + producerLogPathName,
                        " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]

Modified: incubator/kafka/branches/0.8/system_test/utils/metrics.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/metrics.py?rev=1401138&r1=1401137&r2=1401138&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/metrics.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/metrics.py Tue Oct 23 01:26:08 2012
@@ -45,6 +45,26 @@ logger     = logging.getLogger("namedLog
 thisClassName = '(metrics)'
 d = {'name_of_class': thisClassName}
 
+attributeNameToNameInReportedFileMap = {
+    'Min': 'min',
+    'Max': 'max',
+    'Mean': 'mean',
+    '50thPercentile': 'median',
+    'StdDev': 'stddev',
+    '95thPercentile': '95%',
+    '99thPercentile': '99%',
+    '999thPercentile': '99.9%',
+    'Count': 'count',
+    'OneMinuteRate': '1 min rate',
+    'MeanRate': 'mean rate',
+    'FiveMinuteRate': '5 min rate',
+    'FifteenMinuteRate': '15 min rate',
+    'Value': 'value'
+}
+
+def getCSVFileNameFromMetricsMbeanName(mbeanName):
+    return mbeanName.replace(":type=", ".").replace(",name=", ".") + ".csv"
+
 def read_metrics_definition(metricsFile):
     metricsFileData = open(metricsFile, "r").read()
     metricsJsonData = json.loads(metricsFileData)
@@ -71,7 +91,7 @@ def get_dashboard_definition(metricsFile
     return dashboardsForRole
 
 def ensure_valid_headers(headers, attributes):
-    if headers[0] != "time":
+    if headers[0] != "# time":
         raise Exception("First column should be time")
     for header in headers:
         logger.debug(header, extra=d)
@@ -108,14 +128,17 @@ def plot_graphs(inputCsvFiles, labels, t
         try:
             # read first line as the headers
             headers = csv_reader.pop(0)
-            attributeColumnIndex = ensure_valid_headers(headers, attribute)
+            attributeColumnIndex = ensure_valid_headers(headers, attributeNameToNameInReportedFileMap[attribute])
             logger.debug("Column index for attribute {0} is {1}".format(attribute, attributeColumnIndex), extra=d)
-            start_time = int(csv_reader[0][0])
+            start_time = (int)(os.path.getctime(inputCsvFile) * 1000)
+            int(csv_reader[0][0])
             for line in csv_reader:
+                if(len(line) == 0):
+                    continue
                 yVal = float(line[attributeColumnIndex])                
-                xVal = (int(line[0])-start_time)/1000
+                xVal = int(line[0])
                 y.append(yVal)
-                epoch=int(line[0])/1000
+                epoch= start_time + int(line[0])
                 x.append(xVal)
                 xticks_labels.append(time.strftime("%H:%M:%S", time.localtime(epoch)))
                 coordinates.append(Coordinates(xVal, yVal))
@@ -160,9 +183,12 @@ def draw_graph_for_role(graphs, entities
         graphLegendLabels = []
         for entity in entities:
             entityMetricsDir = kafka_system_test_utils.get_testcase_config_log_dir_pathname(testcaseEnv, role, entity['entity_id'], "metrics")
-            entityMetricCsvFile = entityMetricsDir + "/" + graph['bean_name'] + ".csv"
-            inputCsvFiles.append(entityMetricCsvFile)
-            graphLegendLabels.append(role + "-" + entity['entity_id'])
+            entityMetricCsvFile = entityMetricsDir + "/" + getCSVFileNameFromMetricsMbeanName(graph['bean_name'])
+            if(not os.path.exists(entityMetricCsvFile)):
+                logger.warn("The file {0} does not exist for plotting".format(entityMetricCsvFile), extra=d)
+            else:
+                inputCsvFiles.append(entityMetricCsvFile)
+                graphLegendLabels.append(role + "-" + entity['entity_id'])
 #            print "Plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
         try:
             # plot one graph per mbean attribute
@@ -173,7 +199,7 @@ def draw_graph_for_role(graphs, entities
             for labelAndAttribute in zip(labels, fullyQualifiedAttributeNames, attributes):            
                 outputGraphFile = testcaseEnv.testCaseDashboardsDir + "/" + role + "/" + labelAndAttribute[1] + ".svg"            
                 plot_graphs(inputCsvFiles, graphLegendLabels, graph['graph_name'] + '-' + labelAndAttribute[2], 
-                            "time", labelAndAttribute[0], labelAndAttribute[1], outputGraphFile)
+                            "time", labelAndAttribute[0], labelAndAttribute[2], outputGraphFile)
 #            print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
         except Exception as e:
             logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)



Mime
View raw message