kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-831 Controller does not send the complete list of partitions to a newly started broker; reviewed by Jun Rao
Date Thu, 28 Mar 2013 23:29:55 GMT
Updated Branches:
  refs/heads/0.8 66b103895 -> 1d5e95f6c

KAFKA-831 Controller does not send the complete list of partitions to a newly started broker;
reviewed by Jun Rao

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d5e95f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d5e95f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d5e95f6

Branch: refs/heads/0.8
Commit: 1d5e95f6c4067884a374deddd88ec3f471664658
Parents: 66b1038
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Thu Mar 28 16:29:37 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu Mar 28 16:29:37 2013 -0700

 .../scala/kafka/controller/KafkaController.scala   |    7 +++++--
 1 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9d32901..47d4d7b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -273,9 +273,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
     val newBrokersSet = newBrokers.toSet
-    // update partition state machine
-    partitionStateMachine.triggerOnlinePartitionStateChange()
+    // the very first thing to do when a new broker comes up is send it the entire list of
partitions that it is
+    // supposed to host. Based on that the broker starts the high watermark threads for the
input list of partitions
     replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
newBrokers), OnlineReplica)
+    // when a new broker comes up, the controller needs to trigger leader election for all
new and offline partitions
+    // to see if these brokers can become leaders for some/all of those
+    partitionStateMachine.triggerOnlinePartitionStateChange()
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{

View raw message