kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sriram...@apache.org
Subject git commit: auto rebalance last commit
Date Tue, 25 Feb 2014 08:44:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 a2745382d -> b5971264f


auto rebalance last commit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5971264
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5971264
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5971264

Branch: refs/heads/0.8.1
Commit: b5971264f29c6646bc543b47c58786f6322f0bd0
Parents: a274538
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Tue Feb 25 00:36:48 2014 -0800
Committer: Sriram Subramanian <sriram.sub@gmail.com>
Committed: Tue Feb 25 00:36:48 2014 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 44 +++++++++-----------
 1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b5971264/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 00a1f98..f12ffc2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -603,7 +603,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance:
Boolean = true) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -612,7 +612,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for
partitions %s".format(partitions.mkString(",")), e)
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions)
+      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
       deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
     }
   }
@@ -914,7 +914,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition])
{
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
+                                                   isTriggeredByAutoRebalance : Boolean)
{
     for(partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -925,7 +926,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
         warn("Partition %s failed to complete preferred replica leader election. Leader is
%d".format(partition, currentLeader))
       }
     }
-    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+    if (!isTriggeredByAutoRebalance)
+      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
@@ -1090,6 +1092,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
extends Logg
             topicsNotInPreferredReplica =
               topicAndPartitionsForBroker.filter {
                 case(topicPartition, replicas) => {
+                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                   controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
!= leaderBroker
                 }
               }
@@ -1102,26 +1105,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
extends Logg
           // check ratio and if greater than desired ratio, trigger a rebalance for the topic
partitions
           // that need to be on this broker
           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100))
{
-            inLock(controllerContext.controllerLock) {
-              // do this check only if the broker is live and there are no partitions being
reassigned currently
-              // and preferred replica election is not in progress
-              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                  controllerContext.partitionsBeingReassigned.size == 0 &&
-                  controllerContext.partitionsUndergoingPreferredReplicaElection.size ==
0) {
-                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-                val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic"
-> e.topic, "partition" -> e.partition))
-                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
-                try {
-                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
-                  info("Created preferred replica election path with %s".format(jsonData))
-                } catch {
-                  case e2: ZkNodeExistsException =>
-                    val partitionsUndergoingPreferredReplicaElection =
-                      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient,
zkPath)._1)
-                    error("Preferred replica leader election currently in progress for "
+
-                          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
-                  case e3: Throwable =>
-                    error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+            topicsNotInPreferredReplica.foreach {
+              case(topicPartition, replicas) => {
+                inLock(controllerContext.controllerLock) {
+                  // do this check only if the broker is live and there are no partitions
being reassigned currently
+                  // and preferred replica election is not in progress
+                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                      controllerContext.partitionsBeingReassigned.size == 0 &&
+                      controllerContext.partitionsUndergoingPreferredReplicaElection.size
== 0 &&
+                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic)
&&
+                      !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic)
&&
+                      controllerContext.allTopics.contains(topicPartition.topic)) {
+                    onPreferredReplicaElection(Set(topicPartition), false)
+                  }
                 }
               }
             }


Mime
View raw message