kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sriram...@apache.org
Subject [13/19] git commit: some more changes
Date Tue, 25 Feb 2014 08:27:23 GMT
some more changes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e88f1acd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e88f1acd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e88f1acd

Branch: refs/heads/trunk
Commit: e88f1acdaac9fe738787ccb8375519293c913a37
Parents: 425af9b
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Fri Dec 20 11:22:26 2013 -0800
Committer: Sriram Subramanian <sriram.sub@gmail.com>
Committed: Fri Dec 20 11:22:26 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/controller/KafkaController.scala | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e88f1acd/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 74e2ea4..8017abb 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -481,7 +481,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance:
Boolean = true) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -489,7 +489,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for
partitions %s".format(partitions.mkString(",")), e)
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
+      removePartitionsFromPreferredReplicaElection(partitions)
     }
   }
 
@@ -758,8 +758,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     }
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
-                                                   isTriggeredByAutoRebalance : Boolean)
{
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition])
{
     for(partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -770,8 +769,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
         warn("Partition %s failed to complete preferred replica leader election. Leader is
%d".format(partition, currentLeader))
       }
     }
-    if (isTriggeredByAutoRebalance)
-      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
@@ -973,11 +971,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
extends Logg
                   ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
                   info("Created preferred replica election path with %s".format(jsonData))
                 } catch {
-                  case e2: Throwable =>
+                  case e2: ZkNodeExistsException =>
                     val partitionsUndergoingPreferredReplicaElection =
                       PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient,
zkPath)._1)
                     error("Preferred replica leader election currently in progress for "
+
                           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
+                  case e3: Throwable =>
+                    error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
                 }
               }
             }


Mime
View raw message