kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5341; Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics (KIP-164)
Date Fri, 28 Jul 2017 18:28:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4059fa576 -> 4003c9384


KAFKA-5341; Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics (KIP-164)

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3583 from lindong28/KAFKA-5341


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4003c938
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4003c938
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4003c938

Branch: refs/heads/trunk
Commit: 4003c9384bcf452ba7eb4072b96ad731b13da035
Parents: 4059fa5
Author: Dong Lin <lindong28@gmail.com>
Authored: Fri Jul 28 11:28:33 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Jul 28 11:28:33 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/cluster/Partition.scala | 19 +++++++++++++++++++
 .../main/scala/kafka/server/ReplicaManager.scala |  9 +++++++++
 docs/ops.html                                    | 10 ++++++++++
 3 files changed, 38 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4003c938/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index fd3606c..925c8f0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -94,6 +94,15 @@ class Partition(val topic: String,
       tags
     )
 
+    newGauge("UnderMinIsr",
+      new Gauge[Int] {
+        def value = {
+          if (isUnderMinIsr) 1 else 0
+        }
+      },
+      tags
+    )
+
     newGauge("ReplicasCount",
       new Gauge[Int] {
         def value = {
@@ -120,6 +129,15 @@ class Partition(val topic: String,
   def isUnderReplicated: Boolean =
     isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size
 
+  def isUnderMinIsr: Boolean = {
+    leaderReplicaIfLocal match {
+      case Some(leaderReplica) =>
+        inSyncReplicas.size < leaderReplica.log.get.config.minInSyncReplicas
+      case None =>
+        false
+    }
+  }
+
   def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica
= {
     assignedReplicaMap.getAndMaybePut(replicaId, {
       if (isReplicaLocal(replicaId)) {
@@ -559,6 +577,7 @@ class Partition(val topic: String,
    */
   def removePartitionMetrics() {
     removeMetric("UnderReplicated", tags)
+    removeMetric("UnderMinIsr", tags)
     removeMetric("InSyncReplicasCount", tags)
     removeMetric("ReplicasCount", tags)
     removeMetric("LastStableOffsetLag", tags)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4003c938/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5b5ec2c..b66aba0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -222,6 +222,14 @@ class ReplicaManager(val config: KafkaConfig,
       def value = underReplicatedPartitionCount
     }
   )
+
+  val underMinIsrPartitionCount = newGauge(
+    "UnderMinIsrPartitionCount",
+    new Gauge[Int] {
+      def value = getLeaderPartitions.count(_.isUnderMinIsr)
+    }
+  )
+
   val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
   val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
   val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
@@ -1266,6 +1274,7 @@ class ReplicaManager(val config: KafkaConfig,
     removeMetric("PartitionCount")
     removeMetric("OfflineReplicaCount")
     removeMetric("UnderReplicatedPartitions")
+    removeMetric("UnderMinIsrPartitionCount")
   }
 
   // High watermark do not need to be checkpointed only when under unit tests

http://git-wip-us.apache.org/repos/asf/kafka/blob/4003c938/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index d10794e..6c5f316 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -729,6 +729,16 @@
         <td>0</td>
       </tr>
       <tr>
+        <td># of under minIsr partitions (|ISR| &lt min.insync.replicas)</td>
+        <td>kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount</td>
+        <td>0</td>
+      </tr>
+      <tr>
+        <td># of offline log directories</td>
+        <td>kafka.log:type=LogManager,name=OfflineLogDirectoryCount</td>
+        <td>0</td>
+      </tr>
+      <tr>
         <td>Is controller active on broker</td>
         <td>kafka.controller:type=KafkaController,name=ActiveControllerCount</td>
         <td>only one broker in the cluster should have 1</td>


Mime
View raw message