kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3258; Delete broker topic metrics of deleted topics
Date Mon, 23 May 2016 21:41:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 23b70efa8 -> a86ae26fc


KAFKA-3258; Delete broker topic metrics of deleted topics

Delete per-topic metrics when there are no replicas of any partitions of the topic on a broker.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Manikumar reddy O <manikumar.reddy@gmail.com>,
Ashish Singh <asingh@cloudera.com, Jason Gustafson <jason@confluent.io>, Ismael Juma
<ismael@juma.me.uk>

Closes #944 from rajinisivaram/KAFKA-3258


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a86ae26f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a86ae26f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a86ae26f

Branch: refs/heads/0.10.0
Commit: a86ae26fcb18d307d5d54f7061df613ce148fc33
Parents: 23b70ef
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Mon May 16 14:37:09 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon May 23 22:21:28 2016 +0100

----------------------------------------------------------------------
 .../kafka/server/KafkaRequestHandler.scala      | 42 ++++++++++++++++----
 .../scala/kafka/server/ReplicaManager.scala     | 17 ++++----
 .../scala/unit/kafka/metrics/MetricsTest.scala  | 15 ++++++-
 3 files changed, 56 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a86ae26f/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index a1558af..f70955d 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -100,17 +100,37 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup
{
     case Some(topic) => Map("topic" -> topic)
   }
 
-  val messagesInRate = newMeter("MessagesInPerSec", "messages", TimeUnit.SECONDS, tags)
-  val bytesInRate = newMeter("BytesInPerSec", "bytes", TimeUnit.SECONDS, tags)
-  val bytesOutRate = newMeter("BytesOutPerSec", "bytes", TimeUnit.SECONDS, tags)
-  val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags)
-  val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS,
tags)
-  val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS,
tags)
-  val totalProduceRequestRate = newMeter("TotalProduceRequestsPerSec", "requests", TimeUnit.SECONDS,
tags)
-  val totalFetchRequestRate = newMeter("TotalFetchRequestsPerSec", "requests", TimeUnit.SECONDS,
tags)
+  val messagesInRate = newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS,
tags)
+  val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags)
+  val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS,
tags)
+  val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS,
tags)
+  val failedProduceRequestRate = newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
+  val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
+  val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
+  val totalFetchRequestRate = newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
+
+  def close() {
+    removeMetric(BrokerTopicStats.MessagesInPerSec, tags)
+    removeMetric(BrokerTopicStats.BytesInPerSec, tags)
+    removeMetric(BrokerTopicStats.BytesOutPerSec, tags)
+    removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags)
+    removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
+    removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
+    removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)
+    removeMetric(BrokerTopicStats.TotalFetchRequestsPerSec, tags)
+  }
 }
 
 object BrokerTopicStats extends Logging {
+  val MessagesInPerSec = "MessagesInPerSec"
+  val BytesInPerSec = "BytesInPerSec"
+  val BytesOutPerSec = "BytesOutPerSec"
+  val BytesRejectedPerSec = "BytesRejectedPerSec"
+  val FailedProduceRequestsPerSec = "FailedProduceRequestsPerSec"
+  val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
+  val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
+  val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec"
+
   private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
   private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
   private val allTopicsStats = new BrokerTopicMetrics(None)
@@ -120,4 +140,10 @@ object BrokerTopicStats extends Logging {
   def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
     stats.getAndMaybePut(topic)
   }
+
+  def removeMetrics(topic: String) {
+    val metrics = stats.remove(topic)
+    if (metrics != null)
+      metrics.close()
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a86ae26f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 534de27..68f2385 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -110,7 +110,9 @@ class ReplicaManager(val config: KafkaConfig,
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val localBrokerId = config.brokerId
-  private val allPartitions = new Pool[(String, Int), Partition]
+  private val allPartitions = new Pool[(String, Int), Partition](valueFactory = Some { case
(t, p) =>
+    new Partition(t, p, time, this)
+  })
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@@ -223,8 +225,12 @@ class ReplicaManager(val config: KafkaConfig,
       case Some(partition) =>
         if(deletePartition) {
           val removedPartition = allPartitions.remove((topic, partitionId))
-          if (removedPartition != null)
+          if (removedPartition != null) {
             removedPartition.delete() // this will delete the local log
+            val topicHasPartitions = allPartitions.keys.exists { case (t, _) => topic
== t }
+            if (!topicHasPartitions)
+                BrokerTopicStats.removeMetrics(topic)
+          }
         }
       case None =>
         // Delete log and corresponding folders in case replica manager doesn't hold them
anymore.
@@ -266,12 +272,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
-    var partition = allPartitions.get((topic, partitionId))
-    if (partition == null) {
-      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId,
time, this))
-      partition = allPartitions.get((topic, partitionId))
-    }
-    partition
+    allPartitions.getAndMaybePut((topic, partitionId))
   }
 
   def getPartition(topic: String, partitionId: Int): Option[Partition] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a86ae26f/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 3707deb..1980e8a 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package kafka.consumer
+package kafka.metrics
 
 import java.util.Properties
-
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.MetricPredicate
 import org.junit.{After, Test}
@@ -32,6 +31,7 @@ import kafka.utils.TestUtils._
 import scala.collection._
 import scala.collection.JavaConversions._
 import scala.util.matching.Regex
+import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
 
 class MetricsTest extends KafkaServerTestHarness with Logging {
   val numNodes = 2
@@ -79,6 +79,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
   }
 
+  @Test
+  def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
+    val topic = "test-broker-topic-metric"
+    AdminUtils.createTopic(zkUtils, topic, 2, 1)
+    createAndShutdownStep("group0", "consumer0", "producer0")
+    assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
+    assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
+  }
+
   @deprecated("This test has been deprecated and it will be removed in a future release",
"0.10.0.0")
   def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit
= {
     sendMessages(servers, topic, nMessages)


Mime
View raw message