kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3085; BrokerChangeListener computes inconsistent live/dead broker list.
Date Mon, 18 Jan 2016 05:22:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a071e3554 -> 60a5a523b


KAFKA-3085; BrokerChangeListener computes inconsistent live/dead broker list.

Follow up PR as per comments in the ticket.

junrao It should be correct now as `curBrokers` included only live brokers and live/dead brokers
are computed based on it. Could you take a look when you have time?

Author: David Jacot <david.jacot@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #756 from dajac/KAFKA-3085


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/60a5a523
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/60a5a523
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/60a5a523

Branch: refs/heads/trunk
Commit: 60a5a523b6f7e5151d8cdc835845cfc4d2e72b52
Parents: a071e35
Author: David Jacot <david.jacot@gmail.com>
Authored: Sun Jan 17 21:22:36 2016 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Sun Jan 17 21:22:36 2016 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/controller/ReplicaStateMachine.scala | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/60a5a523/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 8eba704..7ebece7 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -357,13 +357,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
         if (hasStarted.get) {
           ControllerStats.leaderElectionTimer.time {
             try {
+              val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
+              val curBrokerIds = curBrokers.map(_.id)
               val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
-              val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
               val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
-              val curBrokers = curBrokerIds.flatMap(zkUtils.getBrokerInfo)
-              val brokerById = curBrokers.map(broker => broker.id -> broker).toMap
-              val newBrokers = newBrokerIds.flatMap(brokerById.get)
+              val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
               controllerContext.liveBrokers = curBrokers
               info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                 .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))


Mime
View raw message