kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over any partition for a topic (#6977)
Date Fri, 19 Jul 2019 22:25:40 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aa8e850  KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over
any partition for a topic (#6977)
aa8e850 is described below

commit aa8e850afb7baa447882b493eae29f99a39e35a0
Author: Tu V. Tran <tuvtran97@gmail.com>
AuthorDate: Fri Jul 19 15:25:18 2019 -0700

    KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over any partition
for a topic (#6977)
    
    * Added removeOldLeaderMetrics in BrokerTopicStats to remove MessagesInPerSec, BytesInPerSec,
BytesOutPerSec for any broker that is no longer a leader of any partition for a particular
topic
    
    * Modified ReplicaManager to remove the metrics of any topic that the current broker has
no leadership (meaning the broker either becomes a follower for all of the partitions in that
topic or stops being a replica)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jun Rao <junrao@gmail.com>
---
 .../scala/kafka/server/KafkaRequestHandler.scala   |  99 ++++++++++----
 .../main/scala/kafka/server/ReplicaManager.scala   |  13 ++
 .../scala/unit/kafka/metrics/MetricsTest.scala     |  10 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 148 +++++++++++++++++++++
 4 files changed, 239 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index e0ad1b6..0397795 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -146,38 +146,63 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup
{
     case Some(topic) => Map("topic" -> topic)
   }
 
-  val messagesInRate = newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS,
tags)
-  val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags)
-  val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS,
tags)
-  val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS,
tags)
-  private[server] val replicationBytesInRate =
-    if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS,
tags))
+  // an internal map for "lazy initialization" of certain metrics
+  private val metricTypeMap = new Pool[String, Meter]
+
+  def messagesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.MessagesInPerSec,
+    newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS, tags))
+  def bytesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesInPerSec,
+    newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags))
+  def bytesOutRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesOutPerSec,
+    newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))
+  def bytesRejectedRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesRejectedPerSec,
+    newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags))
+  private[server] def replicationBytesInRate =
+    if (name.isEmpty) Some(metricTypeMap.getAndMaybePut(
+      BrokerTopicStats.ReplicationBytesInPerSec,
+      newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags)))
     else None
-  private[server] val replicationBytesOutRate =
-    if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes",
TimeUnit.SECONDS, tags))
+  private[server] def replicationBytesOutRate =
+    if (name.isEmpty) Some(metricTypeMap.getAndMaybePut(
+      BrokerTopicStats.ReplicationBytesOutPerSec,
+      newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)))
     else None
-  val failedProduceRequestRate = newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
-  val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
-  val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
-  val totalFetchRequestRate = newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
-  val fetchMessageConversionsRate = newMeter(BrokerTopicStats.FetchMessageConversionsPerSec,
"requests", TimeUnit.SECONDS, tags)
-  val produceMessageConversionsRate = newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec,
"requests", TimeUnit.SECONDS, tags)
+  def failedProduceRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FailedProduceRequestsPerSec,
+    newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS,
tags))
+  def failedFetchRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FailedFetchRequestsPerSec,
+    newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags))
+  def totalProduceRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.TotalProduceRequestsPerSec,
+    newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags))
+  def totalFetchRequestRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.TotalFetchRequestsPerSec,
+    newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags))
+  def fetchMessageConversionsRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.FetchMessageConversionsPerSec,
+    newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", TimeUnit.SECONDS,
tags))
+  def produceMessageConversionsRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.ProduceMessageConversionsPerSec,
+    newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", TimeUnit.SECONDS,
tags))
+
+  // this method helps check with metricTypeMap first before deleting a metric
+  def removeMetricHelper(metricType: String, tags: scala.collection.Map[String, String]):
Unit = {
+    val metric: Meter = metricTypeMap.remove(metricType)
+    if (metric != null) {
+      removeMetric(metricType, tags)
+    }
+  }
 
   def close() {
-    removeMetric(BrokerTopicStats.MessagesInPerSec, tags)
-    removeMetric(BrokerTopicStats.BytesInPerSec, tags)
-    removeMetric(BrokerTopicStats.BytesOutPerSec, tags)
-    removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.MessagesInPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.BytesInPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.BytesOutPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.BytesRejectedPerSec, tags)
     if (replicationBytesInRate.isDefined)
-      removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
+      removeMetricHelper(BrokerTopicStats.ReplicationBytesInPerSec, tags)
     if (replicationBytesOutRate.isDefined)
-      removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
-    removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
-    removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
-    removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
-    removeMetric(BrokerTopicStats.TotalFetchRequestsPerSec, tags)
-    removeMetric(BrokerTopicStats.FetchMessageConversionsPerSec, tags)
-    removeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec, tags)
+      removeMetricHelper(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.TotalFetchRequestsPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.FetchMessageConversionsPerSec, tags)
+    removeMetricHelper(BrokerTopicStats.ProduceMessageConversionsPerSec, tags)
   }
 }
 
@@ -218,6 +243,27 @@ class BrokerTopicStats {
     }
   }
 
+  // This method only removes metrics only used for leader
+  def removeOldLeaderMetrics(topic: String) {
+    val topicMetrics = topicStats(topic)
+    if (topicMetrics != null) {
+      topicMetrics.removeMetricHelper(BrokerTopicStats.MessagesInPerSec, topicMetrics.tags)
+      topicMetrics.removeMetricHelper(BrokerTopicStats.BytesInPerSec, topicMetrics.tags)
+      topicMetrics.removeMetricHelper(BrokerTopicStats.BytesRejectedPerSec, topicMetrics.tags)
+      topicMetrics.removeMetricHelper(BrokerTopicStats.FailedProduceRequestsPerSec, topicMetrics.tags)
+      topicMetrics.removeMetricHelper(BrokerTopicStats.TotalProduceRequestsPerSec, topicMetrics.tags)
+      topicMetrics.removeMetricHelper(BrokerTopicStats.ProduceMessageConversionsPerSec, topicMetrics.tags)
+      topicMetrics.removeMetricHelper(BrokerTopicStats.ReplicationBytesOutPerSec, topicMetrics.tags)
+    }
+  }
+
+  // This method only removes metrics only used for follower
+  def removeOldFollowerMetrics(topic: String): Unit = {
+    val topicMetrics = topicStats(topic)
+    if (topicMetrics != null)
+      topicMetrics.removeMetricHelper(BrokerTopicStats.ReplicationBytesInPerSec, topicMetrics.tags)
+  }
+
   def removeMetrics(topic: String) {
     val metrics = stats.remove(topic)
     if (metrics != null)
@@ -233,7 +279,6 @@ class BrokerTopicStats {
     }
   }
 
-
   def close(): Unit = {
     allTopicsStats.close()
     stats.values.foreach(_.close())
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index aba5aca..1e61d76 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1231,6 +1231,19 @@ class ReplicaManager(val config: KafkaConfig,
         else
           Set.empty[Partition]
 
+        /*
+         * KAFKA-8392
+         * For topic partitions of which the broker is no longer a leader, delete metrics
related to
+         * those topics. Note that this means the broker stops being either a replica or
a leader of
+         * partitions of said topics
+         */
+        val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
+        val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
+        followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
+
+        // remove metrics for brokers which are not followers of a topic
+        leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
+
         leaderAndIsrRequest.partitionStates.asScala.keys.foreach { topicPartition =>
           /*
            * If there is offline log directory, a Partition object may have been created
by getOrCreatePartition()
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 76cce0f..27e5dcd 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -19,7 +19,6 @@ package kafka.metrics
 
 import java.util.Properties
 import javax.management.ObjectName
-
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Meter, MetricPredicate}
 import org.junit.Test
@@ -110,22 +109,25 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
         logSize.map(_ > 0).getOrElse(false))
     }
 
+    // Consume messages to make bytesOut tick
+    TestUtils.consumeTopicRecords(servers, topic, nMessages)
     val initialReplicationBytesIn = meterCount(replicationBytesIn)
     val initialReplicationBytesOut = meterCount(replicationBytesOut)
     val initialBytesIn = meterCount(bytesIn)
     val initialBytesOut = meterCount(bytesOut)
 
+    // BytesOut doesn't include replication, so it shouldn't have changed
+    assertEquals(initialBytesOut, meterCount(bytesOut))
+
     // Produce a few messages to make the metrics tick
     TestUtils.generateAndProduceMessages(servers, topic, nMessages)
 
     assertTrue(meterCount(replicationBytesIn) > initialReplicationBytesIn)
     assertTrue(meterCount(replicationBytesOut) > initialReplicationBytesOut)
     assertTrue(meterCount(bytesIn) > initialBytesIn)
-    // BytesOut doesn't include replication, so it shouldn't have changed
-    assertEquals(initialBytesOut, meterCount(bytesOut))
 
     // Consume messages to make bytesOut tick
-    TestUtils.consumeTopicRecords(servers, topic, nMessages * 2)
+    TestUtils.consumeTopicRecords(servers, topic, nMessages)
 
     assertTrue(meterCount(bytesOut) > initialBytesOut)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c82ee42..196f2dd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1095,4 +1095,152 @@ class ReplicaManagerTest {
       mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName))
   }
 
+  @Test
+  def testOldLeaderLosesMetricsWhenReassignPartitions(): Unit = {
+    val controllerEpoch = 0
+    val leaderEpoch = 0
+    val leaderEpochIncrement = 1
+    val correlationId = 0
+    val controllerId = 0
+    val (rm0, rm1, _, mockTopicStats1) = prepareDifferentReplicaManagersWithMockedBrokerTopicStats()
+
+    EasyMock.expect(mockTopicStats1.removeOldLeaderMetrics(topic)).andVoid.once
+    EasyMock.replay(mockTopicStats1)
+
+    try {
+      // make broker 0 the leader of partition 0 and
+      // make broker 1 the leader of partition 1
+      val tp0 = new TopicPartition(topic, 0)
+      val tp1 = new TopicPartition(topic, 1)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val partition1Replicas = Seq[Integer](1, 0).asJava
+
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+        controllerId, 0, brokerEpoch,
+        collection.immutable.Map(
+          tp0 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 0, leaderEpoch,
+            partition0Replicas, 0, partition0Replicas, true),
+          tp1 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 1, leaderEpoch,
+            partition1Replicas, 0, partition1Replicas, true)
+        ).asJava,
+        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
+
+      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
+      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
+
+      // make broker 0 the leader of partition 1 so broker 1 loses its leadership position
+      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId,
+        controllerEpoch, brokerEpoch,
+        collection.immutable.Map(
+          tp0 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 0, leaderEpoch
+ leaderEpochIncrement,
+            partition0Replicas, 0, partition0Replicas, true),
+          tp1 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 0, leaderEpoch
+ leaderEpochIncrement,
+            partition1Replicas, 0, partition1Replicas, true)
+        ).asJava,
+        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
+
+      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
+      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
+    } finally {
+      rm0.shutdown()
+      rm1.shutdown()
+    }
+
+    // verify that broker 1 did remove its metrics when no longer being the leader of partition
1
+    EasyMock.verify(mockTopicStats1)
+  }
+
+  @Test
+  def testOldFollowerLosesMetricsWhenReassignPartitions(): Unit = {
+    val controllerEpoch = 0
+    val leaderEpoch = 0
+    val leaderEpochIncrement = 1
+    val correlationId = 0
+    val controllerId = 0
+    val (rm0, rm1, _, mockTopicStats1) = prepareDifferentReplicaManagersWithMockedBrokerTopicStats()
+
+    EasyMock.expect(mockTopicStats1.removeOldLeaderMetrics(topic)).andVoid.once
+    EasyMock.expect(mockTopicStats1.removeOldFollowerMetrics(topic)).andVoid.once
+    EasyMock.replay(mockTopicStats1)
+
+    try {
+      // make broker 0 the leader of partition 0 and
+      // make broker 1 the leader of partition 1
+      val tp0 = new TopicPartition(topic, 0)
+      val tp1 = new TopicPartition(topic, 1)
+      val partition0Replicas = Seq[Integer](1, 0).asJava
+      val partition1Replicas = Seq[Integer](1, 0).asJava
+
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+        controllerId, 0, brokerEpoch,
+        collection.immutable.Map(
+          tp0 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 1, leaderEpoch,
+            partition0Replicas, 0, partition0Replicas, true),
+          tp1 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 1, leaderEpoch,
+            partition1Replicas, 0, partition1Replicas, true)
+        ).asJava,
+        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
+
+      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
+      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
+
+      // make broker 0 the leader of partition 1 so broker 1 loses its leadership position
+      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId,
+        controllerEpoch, brokerEpoch,
+        collection.immutable.Map(
+          tp0 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 0, leaderEpoch
+ leaderEpochIncrement,
+            partition0Replicas, 0, partition0Replicas, true),
+          tp1 -> new LeaderAndIsrRequest.PartitionState(controllerEpoch, 0, leaderEpoch
+ leaderEpochIncrement,
+            partition1Replicas, 0, partition1Replicas, true)
+        ).asJava,
+        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
+
+      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
+      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
+    } finally {
+      rm0.shutdown()
+      rm1.shutdown()
+    }
+
+    // verify that broker 1 did remove its metrics when no longer being the leader of partition
1
+    EasyMock.verify(mockTopicStats1)
+  }
+
+  private def prepareDifferentReplicaManagersWithMockedBrokerTopicStats(): (ReplicaManager,
ReplicaManager, BrokerTopicStats, BrokerTopicStats) = {
+    val props0 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    val props1 = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+
+    props0.put("log0.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props1.put("log1.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+
+    val config0 = KafkaConfig.fromProps(props0)
+    val config1 = KafkaConfig.fromProps(props1)
+
+    val mockLogMgr0 = TestUtils.createLogManager(config0.logDirs.map(new File(_)))
+    val mockLogMgr1 = TestUtils.createLogManager(config1.logDirs.map(new File(_)))
+
+    val mockTopicStats0: BrokerTopicStats = EasyMock.createMock(classOf[BrokerTopicStats])
+    val mockTopicStats1: BrokerTopicStats = EasyMock.createMock(classOf[BrokerTopicStats])
+
+    val metadataCache0: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
+    val metadataCache1: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
+
+    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
+
+    EasyMock.expect(metadataCache0.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+    EasyMock.replay(metadataCache0)
+    EasyMock.expect(metadataCache1.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+    EasyMock.replay(metadataCache1)
+
+    // each replica manager is for a broker
+    val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new MockScheduler(time),
mockLogMgr0,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, time, ""),
+      mockTopicStats0, metadataCache0, new LogDirFailureChannel(config0.logDirs.size))
+    val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new MockScheduler(time),
mockLogMgr1,
+      new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, time, ""),
+      mockTopicStats1, metadataCache1, new LogDirFailureChannel(config1.logDirs.size))
+
+    (rm0, rm1, mockTopicStats0, mockTopicStats1)
+  }
+
 }


Mime
View raw message