kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3085; BrokerChangeListener computes inconsistent live/dead broker list
Date Mon, 11 Jan 2016 23:00:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f009c3094 -> e789a35d3


KAFKA-3085; BrokerChangeListener computes inconsistent live/dead broker list

Author: David Jacot <david.jacot@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #752 from dajac/KAFKA-3085


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e789a35d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e789a35d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e789a35d

Branch: refs/heads/trunk
Commit: e789a35d3bce916f25705915b5e86353462a1454
Parents: f009c30
Author: David Jacot <david.jacot@gmail.com>
Authored: Mon Jan 11 15:00:48 2016 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Jan 11 15:00:48 2016 -0800

----------------------------------------------------------------------
 .../kafka/controller/ReplicaStateMachine.scala      | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e789a35d/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 8bb9099..8eba704 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -357,16 +357,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
         if (hasStarted.get) {
           ControllerStats.leaderElectionTimer.time {
             try {
+              val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
-              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
-              val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
-              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
-              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
-              controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
+              val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
+              val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
+              val curBrokers = curBrokerIds.flatMap(zkUtils.getBrokerInfo)
+              val brokerById = curBrokers.map(broker => broker.id -> broker).toMap
+              val newBrokers = newBrokerIds.flatMap(brokerById.get)
+              controllerContext.liveBrokers = curBrokers
               info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                 .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
-              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
-              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
+              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
+              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
               if(newBrokerIds.size > 0)
                 controller.onBrokerStartup(newBrokerIds.toSeq)
               if(deadBrokerIds.size > 0)


Mime
View raw message