kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sriram...@apache.org
Subject [11/19] git commit: Address code review feedbacks
Date Tue, 25 Feb 2014 08:27:21 GMT
Address code review feedbacks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55d77c67
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55d77c67
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55d77c67

Branch: refs/heads/trunk
Commit: 55d77c67c236488df2b9f6bb59d99eeb645a0553
Parents: 7f32a1c
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Mon Dec 9 22:51:41 2013 -0800
Committer: Sriram Subramanian <sriram.sub@gmail.com>
Committed: Mon Dec 9 22:51:41 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 87 +++++++++++++-------
 1 file changed, 56 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/55d77c67/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6cd58a4..522e6c7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -143,6 +143,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   )
 
+  newGauge(
+    "PreferredReplicaImbalanceCount",
+    new Gauge[Int] {
+      def value(): Int = {
+        controllerContext.controllerLock synchronized {
+          if (!isActive())
+            0
+          else
+            controllerContext.partitionReplicaAssignment.count {
+              case (topicPartition, replicas) => controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= replicas.head
+            }
+        }
+      }
+    }
+  )
+
   def epoch = controllerContext.epoch
 
   def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
@@ -465,7 +481,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], updateZk: Boolean =
true) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance:
Boolean = true) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -473,7 +489,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for
partitions %s".format(partitions.mkString(",")), e)
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions, updateZk)
+      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
     }
   }
 
@@ -742,7 +758,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
updateZK : Boolean) {
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
+                                                   isTriggeredByAutoRebalance : Boolean)
{
     for(partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -753,7 +770,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
         warn("Partition %s failed to complete preferred replica leader election. Leader is
%d".format(partition, currentLeader))
       }
     }
-    if (updateZK)
+    if (isTriggeredByAutoRebalance)
       ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
@@ -913,42 +930,50 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
extends Logg
 
   private def checkAndTriggerPartitionRebalance(): Unit = {
     if (isActive()) {
-      info("checking need to trigger partition rebalance")
+      trace("checking need to trigger partition rebalance")
       // get all the active brokers
-      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]]
= null;
+      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]]
= null
       controllerContext.controllerLock synchronized {
-        preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy(_._2.head)
+        preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy
{
+          case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+        }
       }
       debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
       // for each broker, check if a preferred replica election needs to be triggered
-      preferredReplicasForTopicsByBrokers.foreach( brokerInfo => {
-        var imbalanceRatio: Double = 0
-        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
-        controllerContext.controllerLock synchronized {
-          val brokerIds = controllerContext.liveBrokerIds
-          if (brokerIds.contains(brokerInfo._1) &&
-              controllerContext.partitionsBeingReassigned.size == 0) {
-            // do this check only if the broker is live and there are no partitions being
reassigned currently
-            topicsNotInPreferredReplica =
-              brokerInfo._2.filter(item => controllerContext.partitionLeadershipInfo(item._1).leaderAndIsr.leader
!= brokerInfo._1);
-            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
-            val totalTopicPartitionsForBroker = brokerInfo._2.size
-            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
-            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
-            info("leader imbalance ratio for broker %d is %f".format(brokerInfo._1, imbalanceRatio))
+      preferredReplicasForTopicsByBrokers.foreach {
+        case(leaderBroker, topicAndPartitionsForBroker) => {
+          var imbalanceRatio: Double = 0
+          var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
+          controllerContext.controllerLock synchronized {
+            if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                controllerContext.partitionsBeingReassigned.size == 0) {
+              // do this check only if the broker is live and there are no partitions being
reassigned currently
+              topicsNotInPreferredReplica =
+                topicAndPartitionsForBroker.filter {
+                  case(topicPartition, replicas) => {
+                    controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= leaderBroker
+                  }
+                }
+              debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+              val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+              val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+              imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+              trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
+            }
           }
-        }
-        // check ratio and if greater than desired ratio, trigger a rebalance for the topics
-        // that need to be on this broker
-        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100))
{
-          topicsNotInPreferredReplica.foreach(topicPartition => {
-            controllerContext.controllerLock synchronized {
-              onPreferredReplicaElection(Set(topicPartition._1), false)
+          // check ratio and if greater than desired ratio, trigger a rebalance for the topic
partitions
+          // that need to be on this broker
+          if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100))
{
+            topicsNotInPreferredReplica.foreach {
+              case(topicPartition, replicas) => {
+                controllerContext.controllerLock synchronized {
+                  onPreferredReplicaElection(Set(topicPartition), false)
+                }
+              }
             }
-          })
+          }
         }
       }
-      )
     }
   }
 }


Mime
View raw message