kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5244; Refactor BrokerTopicStats and ControllerStats so that they are classes
Date Mon, 15 May 2017 21:35:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 018679410 -> 46aa88b9c


KAFKA-5244; Refactor BrokerTopicStats and ControllerStats so that they are classes

This removes the need to force object initialisation via hacks to register
the relevant Yammer metrics during start-up.

It also works around issues caused by tests that delete JVM-wide singleton
metrics (like `MetricsDuringTopicCreationDeletionTest`). Without this
change, they would never be registered again. After this change, they will
be registered again during KafkaServer start-up.

It would be even better not to rely on JVM side singleton metrics (like we do
for Kafka Metrics), but that's a bigger change that should be considered
separately.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3059 from ijuma/kafka-5244-broker-static-stats-and-controller-stats-as-classes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46aa88b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46aa88b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46aa88b9

Branch: refs/heads/trunk
Commit: 46aa88b9cf82971a0890f4f83efa9639f84c677b
Parents: 0186794
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon May 15 22:35:17 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon May 15 22:35:17 2017 +0100

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      |  24 ++--
 .../controller/PartitionLeaderSelector.scala    |   2 +-
 core/src/main/scala/kafka/log/Log.scala         |  10 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  13 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   3 +-
 .../kafka/server/KafkaRequestHandler.scala      |  32 +++--
 .../main/scala/kafka/server/KafkaServer.scala   |  28 ++--
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     |  31 ++---
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   2 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   4 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |   3 +-
 .../log/AbstractLogCleanerIntegrationTest.scala |   4 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   7 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |   7 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |   4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 128 +++++++++++++------
 .../scala/unit/kafka/metrics/MetricsTest.scala  |   2 +-
 .../server/HighwatermarkPersistenceTest.scala   |   4 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   3 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala |   3 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  18 ++-
 .../unit/kafka/server/SimpleFetchTest.scala     |  12 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 24 files changed, 221 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index be4dd53..41a88d9 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -42,7 +42,10 @@ import scala.collection._
 import scala.util.Try
 
 class ControllerContext(val zkUtils: ZkUtils) {
+  val controllerStats = new ControllerStats
+
   var controllerChannelManager: ControllerChannelManager = null
+
   var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
   var epoch: Int = KafkaController.InitialControllerEpoch - 1
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
@@ -150,10 +153,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   val controllerContext = new ControllerContext(zkUtils)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
+
   // have a separate scheduler for the controller to be able to start and stop independently of the
   // kafka server
   private val kafkaScheduler = new KafkaScheduler(1)
-  val topicDeletionManager: TopicDeletionManager = new TopicDeletionManager(this)
+
+  val topicDeletionManager = new TopicDeletionManager(this)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@@ -162,6 +167,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
 
   private val controllerEventQueue = new LinkedBlockingQueue[ControllerEvent]
   private val controllerEventThread = new ControllerEventThread("controller-event-thread")
+
   private val brokerChangeListener = new BrokerChangeListener(this)
   private val topicChangeListener = new TopicChangeListener(this)
   private val topicDeletionListener = new TopicDeletionListener(this)
@@ -169,6 +175,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   private val partitionReassignmentListener = new PartitionReassignmentListener(this)
   private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
   private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
+
   private val activeControllerId = new AtomicInteger(-1)
   private val offlinePartitionCount = new AtomicInteger(0)
   private val preferredReplicaImbalanceCount = new AtomicInteger(0)
@@ -1155,7 +1162,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
     override def process(): Unit = {
       if (!isActive) return
-      ControllerStats.leaderElectionTimer.time {
+      controllerContext.controllerStats.leaderElectionTimer.time {
         try {
           val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
           val curBrokerIds = curBrokers.map(_.id)
@@ -1713,16 +1720,9 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo
   }
 }
 
-object ControllerStats extends KafkaMetricsGroup {
-
-  private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
-  private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-
-  // KafkaServer needs to initialize controller metrics during startup. We perform initialization
-  // through method calls to avoid Scala compiler warnings.
-  def uncleanLeaderElectionRate: Meter = _uncleanLeaderElectionRate
-
-  def leaderElectionTimer: KafkaTimer = _leaderElectionTimer
+private[controller] class ControllerStats extends KafkaMetricsGroup {
+  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
+  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 }
 
 sealed trait ControllerEvent {

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index d1799a6..42db26b 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -74,7 +74,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
               throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live " +
                 s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are: [$assignedReplicas].")
             } else {
-              ControllerStats.uncleanLeaderElectionRate.mark()
+              controllerContext.controllerStats.uncleanLeaderElectionRate.mark()
               val newLeader = liveAssignedReplicas.head
               warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live " +
                 s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a4796d1..e2d9489 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -113,6 +113,7 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
  *                       Other activities such as log cleaning are not affected by logStartOffset.
  * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
  * @param scheduler The thread pool scheduler used for background actions
+ * @param brokerTopicStats Container for Broker Topic Yammer Metrics
  * @param time The time instance used for checking the clock
  * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
  * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
@@ -123,6 +124,7 @@ class Log(@volatile var dir: File,
           @volatile var logStartOffset: Long = 0L,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
+          brokerTopicStats: BrokerTopicStats,
           time: Time = Time.SYSTEM,
           val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
           val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
@@ -560,8 +562,8 @@ class Log(@volatile var dir: File,
               if (batch.sizeInBytes > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
                 // to be consistent with pre-compression bytesRejectedRate recording
-                BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
-                BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
+                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                 throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
                   .format(batch.sizeInBytes, config.maxMessageSize))
               }
@@ -746,8 +748,8 @@ class Log(@volatile var dir: File,
       // Check if the message sizes are valid.
       val batchSize = batch.sizeInBytes
       if (batchSize > config.maxMessageSize) {
-        BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
-        BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
+        brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+        brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
         throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " +
           s"value of ${config.maxMessageSize}.")
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4ce4716..0b094df 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -55,6 +55,7 @@ class LogManager(val logDirs: Array[File],
                  val maxPidExpirationMs: Int,
                  scheduler: Scheduler,
                  val brokerState: BrokerState,
+                 brokerTopicStats: BrokerTopicStats,
                  time: Time) extends Logging {
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
@@ -175,7 +176,8 @@ class LogManager(val logDirs: Array[File],
             recoveryPoint = logRecoveryPoint,
             maxProducerIdExpirationMs = maxPidExpirationMs,
             scheduler = scheduler,
-            time = time)
+            time = time,
+            brokerTopicStats = brokerTopicStats)
           if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
             this.logsToBeDeleted.add(current)
           } else {
@@ -416,7 +418,8 @@ class LogManager(val logDirs: Array[File],
           recoveryPoint = 0L,
           maxProducerIdExpirationMs = maxPidExpirationMs,
           scheduler = scheduler,
-          time = time)
+          time = time,
+          brokerTopicStats = brokerTopicStats)
         logs.put(topicPartition, log)
         info("Created log for partition [%s,%d] in %s with properties {%s}."
           .format(topicPartition.topic,
@@ -572,7 +575,8 @@ object LogManager {
             zkUtils: ZkUtils,
             brokerState: BrokerState,
             kafkaScheduler: KafkaScheduler,
-            time: Time): LogManager = {
+            time: Time,
+            brokerTopicStats: BrokerTopicStats): LogManager = {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
@@ -602,6 +606,7 @@ object LogManager {
       maxPidExpirationMs = config.transactionIdExpirationMs,
       scheduler = kafkaScheduler,
       brokerState = brokerState,
-      time = time)
+      time = time,
+      brokerTopicStats = brokerTopicStats)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5f1a2d5..c746365 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -70,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val metrics: Metrics,
                 val authorizer: Option[Authorizer],
                 val quotas: QuotaManagers,
+                brokerTopicStats: BrokerTopicStats,
                 val clusterId: String,
                 time: Time) extends Logging {
 
@@ -516,7 +517,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchedPartitionData.put(topicPartition, data)
 
         // record the bytes out metrics only when the response is being sent
-        BrokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
+        brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
       }
 
       val response = new FetchResponse(fetchedPartitionData, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 8dfbe64..9983e3d 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -107,7 +107,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
 
 class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
   val tags: scala.collection.Map[String, String] = name match {
-    case None => scala.collection.Map.empty
+    case None => Map.empty
     case Some(topic) => Map("topic" -> topic)
   }
 
@@ -142,7 +142,7 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
   }
 }
 
-object BrokerTopicStats extends Logging {
+object BrokerTopicStats {
   val MessagesInPerSec = "MessagesInPerSec"
   val BytesInPerSec = "BytesInPerSec"
   val BytesOutPerSec = "BytesOutPerSec"
@@ -153,25 +153,26 @@ object BrokerTopicStats extends Logging {
   val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
   val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
   val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec"
-
   private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
-  private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
-  private val allTopicsStats = new BrokerTopicMetrics(None)
+}
 
-  def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
+class BrokerTopicStats {
+  import BrokerTopicStats._
+
+  private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
+  val allTopicsStats = new BrokerTopicMetrics(None)
 
-  def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
+  def topicStats(topic: String): BrokerTopicMetrics =
     stats.getAndMaybePut(topic)
-  }
 
   def updateReplicationBytesIn(value: Long) {
-    getBrokerAllTopicsStats.replicationBytesInRate.foreach { metric =>
+    allTopicsStats.replicationBytesInRate.foreach { metric =>
       metric.mark(value)
     }
   }
 
   private def updateReplicationBytesOut(value: Long) {
-    getBrokerAllTopicsStats.replicationBytesOutRate.foreach { metric =>
+    allTopicsStats.replicationBytesOutRate.foreach { metric =>
       metric.mark(value)
     }
   }
@@ -186,8 +187,15 @@ object BrokerTopicStats extends Logging {
     if (isFollower) {
       updateReplicationBytesOut(value)
     } else {
-      getBrokerTopicStats(topic).bytesOutRate.mark(value)
-      getBrokerAllTopicsStats.bytesOutRate.mark(value)
+      topicStats(topic).bytesOutRate.mark(value)
+      allTopicsStats.bytesOutRate.mark(value)
     }
   }
+
+
+  def close(): Unit = {
+    allTopicsStats.close()
+    stats.values.foreach(_.close())
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index fdf837c..788f718 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -139,9 +139,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
 
   private var _clusterId: String = null
+  private var _brokerTopicStats: BrokerTopicStats = null
 
   def clusterId: String = _clusterId
 
+  private[kafka] def brokerTopicStats = _brokerTopicStats
+
   newGauge(
     "BrokerState",
     new Gauge[Int] {
@@ -204,11 +207,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         val metricConfig = KafkaServer.metricConfig(config)
         metrics = new Metrics(metricConfig, reporters, time, true)
 
+        /* register broker metrics */
+        _brokerTopicStats = new BrokerTopicStats
+
         quotaManagers = QuotaFactory.instantiate(config, metrics, time)
         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 
         /* start log manager */
-        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time)
+        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)
         logManager.startup()
 
         metadataCache = new MetadataCache(config.brokerId)
@@ -246,7 +252,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
-          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId, time)
+          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+          brokerTopicStats, clusterId, time)
 
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
           config.numIoThreads)
@@ -277,9 +284,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
         checkpointBrokerId(config.brokerId)
 
-        /* register broker metrics */
-        registerStats()
-
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
@@ -304,7 +308,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
-    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, metadataCache)
+    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower,
+      brokerTopicStats, metadataCache)
 
   private def initZk(): ZkUtils = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
@@ -345,15 +350,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   /**
-   *  Forces some dynamic jmx beans to be registered on server startup.
-   */
-  private def registerStats() {
-    BrokerTopicStats.getBrokerAllTopicsStats()
-    ControllerStats.uncleanLeaderElectionRate
-    ControllerStats.leaderElectionTimer
-  }
-
-  /**
    *  Performs controlled shutdown
    */
   private def controlledShutdown() {
@@ -620,6 +616,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         if (metrics != null)
           CoreUtils.swallow(metrics.close())
+        if (brokerTopicStats != null)
+          CoreUtils.swallow(brokerTopicStats.close())
 
         brokerState.newState(NotRunning)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 1148e92..bd678b3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String,
         trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
       if (quota.isThrottled(topicPartition))
         quota.record(records.sizeInBytes)
-      BrokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
+      replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
     } catch {
       case e: KafkaStorageException =>
         fatal(s"Disk error while replicating data for $topicPartition", e)
@@ -368,4 +368,4 @@ object ReplicaFetcherThread {
       case e => Some(e.exception)
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b3d1d32..eec9ab2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -131,6 +131,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean,
                      quotaManager: ReplicationQuotaManager,
+                     val brokerTopicStats: BrokerTopicStats,
                      val metadataCache: MetadataCache,
                      threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
@@ -264,7 +265,7 @@ class ReplicaManager(val config: KafkaConfig,
             removedPartition.delete() // this will delete the local log
             val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic)
             if (!topicHasPartitions)
-              BrokerTopicStats.removeMetrics(topicPartition.topic)
+              brokerTopicStats.removeMetrics(topicPartition.topic)
           }
         }
       case None =>
@@ -503,8 +504,8 @@ class ReplicaManager(val config: KafkaConfig,
                                requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
     trace("Append [%s] to local log ".format(entriesPerPartition))
     entriesPerPartition.map { case (topicPartition, records) =>
-      BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
-      BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
+      brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
+      brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
 
       // reject appending to internal topics if it is not allowed
       if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
@@ -529,10 +530,10 @@ class ReplicaManager(val config: KafkaConfig,
               info.lastOffset - info.firstOffset + 1
 
           // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
-          BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
-          BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(records.sizeInBytes)
-          BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
-          BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
+          brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
+          brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
+          brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
+          brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
 
           trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
             .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
@@ -552,8 +553,8 @@ class ReplicaManager(val config: KafkaConfig,
                    _: InvalidTimestampException) =>
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
-            BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
-            BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
+            brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
+            brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
             error("Error processing append operation on partition %s".format(topicPartition), t)
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
         }
@@ -649,8 +650,8 @@ class ReplicaManager(val config: KafkaConfig,
       val partitionFetchSize = fetchInfo.maxBytes
       val followerLogStartOffset = fetchInfo.logStartOffset
 
-      BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
-      BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
+      brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
+      brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
 
       try {
         trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
@@ -726,8 +727,8 @@ class ReplicaManager(val config: KafkaConfig,
                         readSize = partitionFetchSize,
                         exception = Some(e))
         case e: Throwable =>
-          BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark()
-          BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
+          brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
+          brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
           error(s"Error processing fetch operation on partition $tp, offset $offset", e)
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                         hw = -1L,
@@ -1119,8 +1120,8 @@ object OffsetsForLeaderEpoch extends Logging {
       val offset = try {
         new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch))
       } catch {
-        case e: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
-        case e: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+        case _: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+        case _: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
       }
       (tp, offset)
     }.toMap.asJava

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 9a04e67..cd0c74b 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
 
       override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
         new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
-          quotaManagers.follower, metadataCache) {
+          quotaManagers.follower, new BrokerTopicStats, metadataCache) {
 
           override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
                                                              quotaManager: ReplicationQuotaManager) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 806702d..7583d45 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 import java.util.concurrent.atomic._
 
 import kafka.log._
+import kafka.server.BrokerTopicStats
 import kafka.utils._
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
 import org.apache.kafka.common.record.FileRecords
@@ -46,7 +47,8 @@ object StressTestLog {
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
-                      time = time)
+                      time = time,
+                      brokerTopicStats = new BrokerTopicStats)
     val writer = new WriterThread(log)
     writer.start()
     val reader = new ReaderThread(log)

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 69152e3..658e3a0 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -25,6 +25,7 @@ import java.util.{Properties, Random}
 import joptsimple._
 import kafka.log._
 import kafka.message._
+import kafka.server.BrokerTopicStats
 import kafka.utils._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -206,7 +207,7 @@ object TestLinearWriteSpeed {
   
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable {
     Utils.delete(dir)
-    val log = new Log(dir, config, 0L, 0L, scheduler, Time.SYSTEM)
+    val log = new Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM)
     def write(): Int = {
       log.appendAsLeader(messages, leaderEpoch = 0)
       messages.sizeInBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 3c9ddd1..f796042 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -20,6 +20,7 @@ import java.io.File
 import java.nio.file.Files
 import java.util.Properties
 
+import kafka.server.BrokerTopicStats
 import kafka.utils.{MockTime, Pool, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
@@ -95,7 +96,8 @@ abstract class AbstractLogCleanerIntegrationTest {
         logStartOffset = 0L,
         recoveryPoint = 0L,
         scheduler = time.scheduler,
-        time = time)
+        time = time,
+        brokerTopicStats = new BrokerTopicStats)
       logMap.put(partition, log)
       this.logs += log
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index a42ae22..ee46341 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -25,10 +25,12 @@ import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import org.apache.kafka.common.record.{SimpleRecord, CompressionType, MemoryRecords}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.utils.Utils
 import java.util.{Collection, Properties}
 
+import kafka.server.BrokerTopicStats
+
 import scala.collection.JavaConverters._
 
 @RunWith(value = classOf[Parameterized])
@@ -53,7 +55,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
     val logProps = new Properties()
     logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      time = time, brokerTopicStats = new BrokerTopicStats)
 
     /* append two messages */
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 9f9d982..d577f02 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -20,6 +20,7 @@ package kafka.log
 import java.io.File
 import java.util.Properties
 
+import kafka.server.BrokerTopicStats
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
@@ -234,12 +235,14 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
-      time = time)
+      time = time,
+      brokerTopicStats = new BrokerTopicStats)
     log
   }
 
   private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      time = time, brokerTopicStats = new BrokerTopicStats)
 
   private def records(key: Int, value: Int, timestamp: Long) =
     MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c842fc3..704aa73 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -23,6 +23,7 @@ import java.nio.file.Paths
 import java.util.Properties
 
 import kafka.common._
+import kafka.server.BrokerTopicStats
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record._
@@ -974,7 +975,8 @@ class LogCleanerTest extends JUnitSuite {
     messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
 
   private def makeLog(dir: File = dir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      time = time, brokerTopicStats = new BrokerTopicStats)
 
   private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */  }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e545255..d7ca029 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -27,7 +27,7 @@ import kafka.common.KafkaException
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import kafka.utils._
-import kafka.server.KafkaConfig
+import kafka.server.{BrokerTopicStats, KafkaConfig}
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record._
@@ -45,6 +45,7 @@ class LogTest {
   val time = new MockTime()
   var config: KafkaConfig = null
   val logConfig = LogConfig()
+  val brokerTopicStats = new BrokerTopicStats
 
   @Before
   def setUp() {
@@ -54,6 +55,7 @@ class LogTest {
 
   @After
   def tearDown() {
+    brokerTopicStats.close()
     Utils.delete(tmpDir)
   }
 
@@ -100,6 +102,7 @@ class LogTest {
                       recoveryPoint = 0L,
                       maxProducerIdExpirationMs = 24 * 60,
                       scheduler = time.scheduler,
+                      brokerTopicStats = brokerTopicStats,
                       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     // Test the segment rolling behavior when messages do not have a timestamp.
@@ -151,7 +154,8 @@ class LogTest {
       LogConfig(logProps),
       recoveryPoint = 0L,
       scheduler = time.scheduler,
-      time = time)
+      time = time,
+      brokerTopicStats = new BrokerTopicStats)
 
     val pid = 1L
     val epoch: Short = 0
@@ -410,6 +414,7 @@ class LogTest {
       LogConfig(logProps),
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
 
     val pid = 1L
@@ -485,6 +490,7 @@ class LogTest {
       LogConfig(logProps),
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
 
     val epoch: Short = 0
@@ -601,6 +607,7 @@ class LogTest {
       LogConfig(logProps),
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
 
     val pid = 1L
@@ -632,6 +639,7 @@ class LogTest {
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     log.appendAsLeader(set, leaderEpoch = 0)
@@ -661,7 +669,8 @@ class LogTest {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -676,7 +685,8 @@ class LogTest {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds), leaderEpoch = 0)
   }
 
@@ -689,7 +699,8 @@ class LogTest {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -713,7 +724,8 @@ class LogTest {
   def testAppendAndReadWithNonSequentialOffsets() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -738,7 +750,8 @@ class LogTest {
   def testReadAtLogGap() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -755,7 +768,8 @@ class LogTest {
   def testReadWithMinMessage() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -783,7 +797,8 @@ class LogTest {
   def testReadWithTooSmallMaxLength() {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -819,7 +834,8 @@ class LogTest {
 
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@@ -850,7 +866,8 @@ class LogTest {
     /* create a multipart log with 100 messages */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
                                                                                 timestamp = time.milliseconds))
@@ -888,7 +905,8 @@ class LogTest {
     /* this log should roll after every messageset */
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
@@ -914,7 +932,8 @@ class LogTest {
       val logProps = new Properties()
       logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
       logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
-      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+      val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+        brokerTopicStats = brokerTopicStats, time = time)
       for(i <- 0 until messagesToAppend)
         log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10), leaderEpoch = 0)
 
@@ -950,7 +969,8 @@ class LogTest {
     logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
 
     try {
       log.appendAsLeader(messageSet, leaderEpoch = 0)
@@ -977,7 +997,8 @@ class LogTest {
     val logProps = new Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
 
     try {
       log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
@@ -1019,7 +1040,8 @@ class LogTest {
     val maxMessageSize = second.sizeInBytes - 1
     val logProps = new Properties()
     logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
 
     // should be able to append the small message
     log.appendAsLeader(first, leaderEpoch = 0)
@@ -1045,7 +1067,8 @@ class LogTest {
     logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
         timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
@@ -1071,12 +1094,14 @@ class LogTest {
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, time = time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     verifyRecoveredLog(log)
     log.close()
   }
@@ -1092,7 +1117,8 @@ class LogTest {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
 
     val messages = (0 until numMessages).map { i =>
       MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@@ -1116,7 +1142,8 @@ class LogTest {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.index.file)
@@ -1128,7 +1155,8 @@ class LogTest {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@@ -1155,7 +1183,8 @@ class LogTest {
     logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@@ -1165,7 +1194,8 @@ class LogTest {
     timeIndexFiles.foreach(_.delete())
 
     // The rebuilt time index should be empty
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, time = time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val segArray = log.logSegments.toArray
     for (i <- 0 until segArray.size - 1) {
       assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@@ -1186,7 +1216,8 @@ class LogTest {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
 
     val config = LogConfig(logProps)
-    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.index.file)
@@ -1208,7 +1239,8 @@ class LogTest {
     }
 
     // reopen the log
-    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, time = time)
+    log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
@@ -1234,7 +1266,8 @@ class LogTest {
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
 
     // create a log
-    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (_ <- 1 to msgPerSeg)
@@ -1289,7 +1322,8 @@ class LogTest {
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
     val config = LogConfig(logProps)
-    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -1336,6 +1370,7 @@ class LogTest {
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
+                      brokerTopicStats = brokerTopicStats,
                       time = time)
 
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
@@ -1368,6 +1403,7 @@ class LogTest {
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
+                      brokerTopicStats = brokerTopicStats,
                       time = time)
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
@@ -1379,6 +1415,7 @@ class LogTest {
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
                   scheduler = time.scheduler,
+                  brokerTopicStats = brokerTopicStats,
                   time = time)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
@@ -1405,6 +1442,7 @@ class LogTest {
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
+                      brokerTopicStats = brokerTopicStats,
                       time = time)
 
     // append some messages to create some segments
@@ -1446,6 +1484,7 @@ class LogTest {
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
+                      brokerTopicStats = brokerTopicStats,
                       time = time)
 
     // append some messages to create some segments
@@ -1461,6 +1500,7 @@ class LogTest {
                   logStartOffset = 0L,
                   recoveryPoint = 0L,
                   scheduler = time.scheduler,
+                  brokerTopicStats = brokerTopicStats,
                   time = time)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
@@ -1472,6 +1512,7 @@ class LogTest {
                       logStartOffset = 0L,
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
+                      brokerTopicStats = brokerTopicStats,
                       time = time)
     log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
     val head = log.read(0, 4096, None).records.records.iterator.next()
@@ -1486,6 +1527,7 @@ class LogTest {
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
@@ -1500,6 +1542,7 @@ class LogTest {
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
@@ -1523,6 +1566,7 @@ class LogTest {
                         logStartOffset = 0L,
                         recoveryPoint = 0L,
                         scheduler = time.scheduler,
+                        brokerTopicStats = brokerTopicStats,
                         time = time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
@@ -1535,7 +1579,7 @@ class LogTest {
       TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
 
       // attempt recovery
-      log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, time)
+      log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, brokerTopicStats, time)
       assertEquals(numMessages, log.logEndOffset)
 
       val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
@@ -1566,6 +1610,7 @@ class LogTest {
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
     val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
@@ -1617,6 +1662,7 @@ class LogTest {
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
     for (_ <- 0 until 100)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -1625,7 +1671,7 @@ class LogTest {
     // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
     // clean shutdown file exists.
     recoveryPoint = log.logEndOffset
-    log = new Log(logDir, config, 0L, 0L, time.scheduler, time)
+    log = new Log(logDir, config, 0L, 0L, time.scheduler, brokerTopicStats, time)
     assertEquals(recoveryPoint, log.logEndOffset)
     cleanShutdownFile.delete()
   }
@@ -1795,6 +1841,7 @@ class LogTest {
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time)
 
     // append some messages to create some segments
@@ -1924,7 +1971,7 @@ class LogTest {
 
   @Test
   def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
-    def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L)
+    def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
     val log = createLog(createRecords.sizeInBytes,
       retentionMs = 10000,
       cleanupPolicy = "compact,delete")
@@ -1943,7 +1990,8 @@ class LogTest {
 
     //Given this partition is on leader epoch 72
     val epoch = 72
-    val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     log.leaderEpochCache.assign(epoch, records.size)
 
     //When appending messages as a leader (i.e. assignOffsets = true)
@@ -1975,7 +2023,8 @@ class LogTest {
       recs
     }
 
-    val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
 
     //When appending as follower (assignOffsets = false)
     for (i <- records.indices)
@@ -2034,7 +2083,8 @@ class LogTest {
   def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * createRecords.sizeInBytes).toString)
-    val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val cache = epochCache(log)
 
     //Given 2 segments, 10 messages per segment
@@ -2079,7 +2129,8 @@ class LogTest {
     */
   @Test
   def testLogRecoversForLeaderEpoch() {
-    val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val leaderEpochCache = epochCache(log)
     val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
     log.appendAsFollower(records = firstBatch)
@@ -2101,7 +2152,8 @@ class LogTest {
     log.close()
 
     // reopen the log and recover from the beginning
-    val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+    val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats, time = time)
     val recoveredLeaderEpochCache = epochCache(recoveredLog)
 
     // epoch entries should be recovered
@@ -2430,15 +2482,15 @@ class LogTest {
     logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     val config = LogConfig(logProps)
-    val log = new Log(logDir,
+    new Log(logDir,
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
       scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
       time = time,
       maxProducerIdExpirationMs = maxPidExpirationMs,
       producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs)
-    log
   }
 
   private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 745fea6..37c2619 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -86,7 +86,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
     TestUtils.produceMessages(servers, topic, nMessages)
     assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
-    assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
+    servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 55cfa27..b6b40c2 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -61,7 +61,7 @@ class HighwatermarkPersistenceTest {
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
       logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
-      new MetadataCache(configs.head.brokerId))
+      new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -106,7 +106,7 @@ class HighwatermarkPersistenceTest {
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
       scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
-      new MetadataCache(configs.head.brokerId))
+      new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 0c62a50..da94569 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -55,7 +55,8 @@ class IsrExpirationTest {
   @Before
   def setUp() {
     replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false),
-      QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
+      QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats,
+      new MetadataCache(configs.head.brokerId))
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9f7a47a..e770106 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -175,7 +175,8 @@ class ReplicaManagerQuotasTest {
     replay(logManager)
 
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
+      new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 4886b94..6efd0a3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -64,7 +64,8 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
+      new MetadataCache(config.brokerId))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -82,7 +83,8 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
+      new MetadataCache(config.brokerId))
     try {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
@@ -99,7 +101,8 @@ class ReplicaManagerTest {
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId), Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
+      new MetadataCache(config.brokerId), Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
         assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@@ -132,7 +135,8 @@ class ReplicaManagerTest {
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache)
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
+      metadataCache)
 
     try {
       var produceCallbackFired = false
@@ -211,7 +215,8 @@ class ReplicaManagerTest {
     EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
+      metadataCache, Option(this.getClass.getName))
 
     try {
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
@@ -338,7 +343,8 @@ class ReplicaManagerTest {
     EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes()
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
+      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
+      metadataCache, Option(this.getClass.getName))
 
     try {
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index d7822c1..ac851d8 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -99,7 +99,8 @@ class SimpleFetchTest {
 
     // create the replica manager
     replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
-      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
+      new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats,
+      new MetadataCache(configs.head.brokerId))
 
     // add the partition with two replicas, both in ISR
     val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
@@ -150,8 +151,9 @@ class SimpleFetchTest {
    */
   @Test
   def testReadFromLog() {
-    val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()
-    val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
+    val brokerTopicStats = new BrokerTopicStats
+    val initialTopicCount = brokerTopicStats.topicStats(topic).totalFetchRequestRate.count()
+    val initialAllTopicsCount = brokerTopicStats.allTopicsStats.totalFetchRequestRate.count()
 
     val readCommittedRecords = replicaManager.readFromLocalLog(
       replicaId = Request.OrdinaryConsumerId,
@@ -178,7 +180,7 @@ class SimpleFetchTest {
     assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,
       new SimpleRecord(firstRecord))
 
-    assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
-    assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())
+    assertEquals("Counts should increment after fetch", initialTopicCount+2, brokerTopicStats.topicStats(topic).totalFetchRequestRate.count())
+    assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, brokerTopicStats.allTopicsStats.totalFetchRequestRate.count())
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/46aa88b9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e2511d8..012fdfd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -975,7 +975,8 @@ object TestUtils extends Logging {
                    maxPidExpirationMs = 60 * 60 * 1000,
                    scheduler = time.scheduler,
                    time = time,
-                   brokerState = BrokerState())
+                   brokerState = BrokerState(),
+                   brokerTopicStats = new BrokerTopicStats)
   }
 
   @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")


Mime
View raw message