kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5135; Controller Health Metrics (KIP-143)
Date Tue, 23 May 2017 23:38:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 69970630b -> 999c247e0


KAFKA-5135; Controller Health Metrics (KIP-143)

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Onur Karaman <okaraman@linkedin.com>

Closes #2983 from ijuma/kafka-5135-controller-health-metrics-kip-143

(cherry picked from commit 516d8457d8142111a91af94cab918c84990685da)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/999c247e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/999c247e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/999c247e

Branch: refs/heads/0.11.0
Commit: 999c247e076cbf7e84141533c5a9042a844d57ae
Parents: 6997063
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed May 24 00:36:52 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed May 24 00:38:29 2017 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |   3 +-
 .../controller/ControllerChannelManager.scala   |  37 +-
 .../controller/ControllerEventManager.scala     |  65 +++
 .../kafka/controller/ControllerState.scala      |  83 ++++
 .../kafka/controller/KafkaController.scala      | 456 ++++++++++---------
 .../controller/PartitionLeaderSelector.scala    |   2 +-
 .../kafka/controller/TopicDeletionManager.scala |   7 +-
 .../main/scala/kafka/metrics/KafkaTimer.scala   |   8 +-
 .../scala/kafka/server/ReplicaManager.scala     |   5 +-
 .../controller/ControllerEventManagerTest.scala |  94 ++++
 .../controller/ControllerIntegrationTest.scala  |  20 +-
 ...MetricsDuringTopicCreationDeletionTest.scala |  10 +-
 12 files changed, 549 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f123a16..e617404 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -528,7 +528,7 @@ class Partition(val topic: String,
   }
 
   private def updateIsr(newIsr: Set[Replica]) {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
     val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
       newLeaderAndIsr, controllerEpoch, zkVersion)
 
@@ -538,6 +538,7 @@ class Partition(val topic: String,
       zkVersion = newVersion
       trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
     } else {
+      replicaManager.failedIsrUpdatesRate.mark()
       info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 41b9549..ea8d13b 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,9 +19,11 @@ package kafka.controller
 import java.net.SocketTimeoutException
 import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
 
+import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.clients._
@@ -38,11 +40,26 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.collection.{Set, mutable}
 
-class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
+object ControllerChannelManager {
+  val QueueSizeMetricName = "QueueSize"
+}
+
+class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics,
+                               threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+  import ControllerChannelManager._
   protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
+  newGauge(
+    "TotalQueueSize",
+    new Gauge[Int] {
+      def value: Int = brokerLock synchronized {
+        brokerStateInfo.values.iterator.map(_.messageQueue.size).sum
+      }
+    }
+  )
+
   controllerContext.liveBrokers.foreach(addNewBroker)
 
   def startup() = {
@@ -133,9 +150,21 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
       brokerNode, config, time, threadName)
     requestThread.setDaemon(false)
-    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
+
+    val queueSizeGauge = newGauge(
+      QueueSizeMetricName,
+      new Gauge[Int] {
+        def value: Int = messageQueue.size
+      },
+      queueSizeTags(broker.id)
+    )
+
+    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
+      requestThread, queueSizeGauge))
   }
 
+  private def queueSizeTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
+
   private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
     try {
       // Shutdown the RequestSendThread before closing the NetworkClient to avoid the concurrent use of the
@@ -145,6 +174,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       brokerState.requestSendThread.shutdown()
       brokerState.networkClient.close()
       brokerState.messageQueue.clear()
+      removeMetric(QueueSizeMetricName, queueSizeTags(brokerState.brokerNode.id))
       brokerStateInfo.remove(brokerState.brokerNode.id)
     } catch {
       case e: Throwable => error("Error while removing broker by the controller", e)
@@ -465,7 +495,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
 case class ControllerBrokerStateInfo(networkClient: NetworkClient,
                                      brokerNode: Node,
                                      messageQueue: BlockingQueue[QueueItem],
-                                     requestSendThread: RequestSendThread)
+                                     requestSendThread: RequestSendThread,
+                                     queueSizeGauge: Gauge[Int])
 
 case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/ControllerEventManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
new file mode 100644
index 0000000..3c0da23
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection._
+
+import kafka.metrics.KafkaTimer
+import kafka.utils.ShutdownableThread
+
+class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
+                             eventProcessedListener: ControllerEvent => Unit) {
+
+  @volatile private var _state: ControllerState = ControllerState.Idle
+
+  private val queue = new LinkedBlockingQueue[ControllerEvent]
+  private val thread = new ControllerEventThread("controller-event-thread")
+
+  def state: ControllerState = _state
+
+  def start(): Unit = thread.start()
+
+  def close(): Unit = thread.shutdown()
+
+  def put(event: ControllerEvent): Unit = queue.put(event)
+
+  class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      val controllerEvent = queue.take()
+      _state = controllerEvent.state
+
+      try {
+        rateAndTimeMetrics(state).time {
+          controllerEvent.process()
+        }
+      } catch {
+        case e: Throwable => error(s"Error processing event $controllerEvent", e)
+      }
+
+      try eventProcessedListener(controllerEvent)
+      catch {
+        case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
+      }
+
+      _state = ControllerState.Idle
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/ControllerState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
new file mode 100644
index 0000000..2f690bb
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import scala.collection.Seq
+
+sealed abstract class ControllerState {
+
+  def value: Byte
+
+  def rateAndTimeMetricName: Option[String] =
+    if (hasRateAndTimeMetric) Some(s"${toString}RateAndTimeMs") else None
+
+  protected def hasRateAndTimeMetric: Boolean = true
+}
+
+object ControllerState {
+
+  // Note: `rateAndTimeMetricName` is based on the case object name by default. Changing a name is a breaking change
+  // unless `rateAndTimeMetricName` is overridden.
+
+  case object Idle extends ControllerState {
+    def value = 0
+    override protected def hasRateAndTimeMetric: Boolean = false
+  }
+
+  case object ControllerChange extends ControllerState {
+    def value = 1
+  }
+
+  case object BrokerChange extends ControllerState {
+    def value = 2
+    // The LeaderElectionRateAndTimeMs metric existed before `ControllerState` was introduced and we keep the name
+    // for backwards compatibility. The alternative would be to have the same metric under two different names.
+    override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs")
+  }
+
+  case object TopicChange extends ControllerState {
+    def value = 3
+  }
+
+  case object TopicDeletion extends ControllerState {
+    def value = 4
+  }
+
+  case object PartitionReassignment extends ControllerState {
+    def value = 5
+  }
+
+  case object AutoLeaderBalance extends ControllerState {
+    def value = 6
+  }
+
+  case object ManualLeaderBalance extends ControllerState {
+    def value = 7
+  }
+
+  case object ControlledShutdown extends ControllerState {
+    def value = 8
+  }
+
+  case object IsrChange extends ControllerState {
+    def value = 9
+  }
+
+  val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
+    PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 69669cd..dbce485 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,10 +16,9 @@
  */
 package kafka.controller
 
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+import java.util.concurrent.TimeUnit
 
-import com.yammer.metrics.core.{Gauge, Meter}
+import com.yammer.metrics.core.Gauge
 import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
@@ -42,7 +41,7 @@ import scala.collection._
 import scala.util.Try
 
 class ControllerContext(val zkUtils: ZkUtils) {
-  val controllerStats = new ControllerStats
+  val stats = new ControllerStats
 
   var controllerChannelManager: ControllerChannelManager = null
 
@@ -158,55 +157,59 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   // kafka server
   private val kafkaScheduler = new KafkaScheduler(1)
 
-  val topicDeletionManager = new TopicDeletionManager(this)
+  private val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
+
+  val topicDeletionManager = new TopicDeletionManager(this, eventManager)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
 
-  private val controllerEventQueue = new LinkedBlockingQueue[ControllerEvent]
-  private val controllerEventThread = new ControllerEventThread("controller-event-thread")
-
-  private val brokerChangeListener = new BrokerChangeListener(this)
-  private val topicChangeListener = new TopicChangeListener(this)
-  private val topicDeletionListener = new TopicDeletionListener(this)
+  private val brokerChangeListener = new BrokerChangeListener(this, eventManager)
+  private val topicChangeListener = new TopicChangeListener(this, eventManager)
+  private val topicDeletionListener = new TopicDeletionListener(this, eventManager)
   private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
-  private val partitionReassignmentListener = new PartitionReassignmentListener(this)
-  private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
-  private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
+  private val partitionReassignmentListener = new PartitionReassignmentListener(this, eventManager)
+  private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, eventManager)
+  private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, eventManager)
 
-  private val activeControllerId = new AtomicInteger(-1)
-  private val offlinePartitionCount = new AtomicInteger(0)
-  private val preferredReplicaImbalanceCount = new AtomicInteger(0)
+  @volatile private var activeControllerId = -1
+  @volatile private var offlinePartitionCount = 0
+  @volatile private var preferredReplicaImbalanceCount = 0
 
   newGauge(
     "ActiveControllerCount",
     new Gauge[Int] {
-      def value() = if (isActive) 1 else 0
+      def value = if (isActive) 1 else 0
     }
   )
 
   newGauge(
     "OfflinePartitionsCount",
     new Gauge[Int] {
-      def value(): Int = {
-        offlinePartitionCount.get()
-      }
+      def value: Int = offlinePartitionCount
     }
   )
 
   newGauge(
     "PreferredReplicaImbalanceCount",
     new Gauge[Int] {
-      def value(): Int = {
-        preferredReplicaImbalanceCount.get()
-      }
+      def value: Int = preferredReplicaImbalanceCount
+    }
+  )
+
+  newGauge(
+    "ControllerState",
+    new Gauge[Byte] {
+      def value: Byte = state.value
     }
   )
 
   def epoch: Int = controllerContext.epoch
 
+  def state: ControllerState = eventManager.state
+
   def clientId: String = {
     val controllerListener = config.listeners.find(_.listenerName == config.interBrokerListenerName).getOrElse(
       throw new IllegalArgumentException(s"No listener with name ${config.interBrokerListenerName} is configured."))
@@ -223,7 +226,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
    */
   def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit): Unit = {
     val controlledShutdownEvent = ControlledShutdown(id, controlledShutdownCallback)
-    addToControllerEventQueue(controlledShutdownEvent)
+    eventManager.put(controlledShutdownEvent)
   }
 
   /**
@@ -280,7 +283,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = {
-    kafkaScheduler.schedule("auto-leader-rebalance-task", () => addToControllerEventQueue(AutoPreferredReplicaLeaderElection),
+    kafkaScheduler.schedule("auto-leader-rebalance-task", () => eventManager.put(AutoPreferredReplicaLeaderElection),
       delay = delay, unit = unit)
   }
 
@@ -301,8 +304,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
 
     // shutdown leader rebalance scheduler
     kafkaScheduler.shutdown()
-    offlinePartitionCount.set(0)
-    preferredReplicaImbalanceCount.set(0)
+    offlinePartitionCount = 0
+    preferredReplicaImbalanceCount = 0
 
     // de-register partition ISR listener for on-going partition reassignment task
     deregisterPartitionReassignmentIsrChangeListeners()
@@ -325,7 +328,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   /**
    * Returns true if this broker is the current controller.
    */
-  def isActive: Boolean = activeControllerId.get() == config.brokerId
+  def isActive: Boolean = activeControllerId == config.brokerId
 
   /**
    * This callback is invoked by the replica state machine's broker change listener, with the list of newly started
@@ -535,7 +538,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
                                                     partition: Int,
                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, topic, partition, reassignedReplicas.toSet)
+    val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, eventManager, topic, partition,
+      reassignedReplicas.toSet)
     reassignedPartitionContext.isrChangeListener = isrChangeListener
     // register listener on the leader and isr path to wait until they catch up with the current leader
     zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
@@ -589,8 +593,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
    * elector
    */
   def startup() = {
-    addToControllerEventQueue(Startup)
-    controllerEventThread.start()
+    eventManager.put(Startup)
+    eventManager.start()
   }
 
   /**
@@ -599,7 +603,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
    * shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
    */
   def shutdown() = {
-    controllerEventThread.shutdown()
+    eventManager.close()
     onControllerResignation()
   }
 
@@ -640,11 +644,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   private def registerSessionExpirationListener() = {
-    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this))
+    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
   }
 
   private def registerControllerChangeListener() = {
-    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this))
+    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager))
   }
 
   private def initializeControllerContext() {
@@ -685,7 +689,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   private def resetControllerContext(): Unit = {
-    if(controllerContext.controllerChannelManager != null) {
+    if (controllerContext.controllerChannelManager != null) {
       controllerContext.controllerChannelManager.shutdown()
       controllerContext.controllerChannelManager = null
     }
@@ -864,7 +868,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   def registerPartitionModificationsListener(topic: String) = {
-    partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, topic))
+    partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, eventManager, topic))
     zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
   }
 
@@ -1106,39 +1110,36 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     finalLeaderIsrAndControllerEpoch
   }
 
-  private def checkAndTriggerPartitionRebalance(): Unit = {
-    trace("checking need to trigger partition rebalance")
-    var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = controllerContext.partitionReplicaAssignment
-      .filterNot(p => topicDeletionManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
-        case (_, assignedReplicas) => assignedReplicas.head
-      }
-    debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+  private def checkAndTriggerAutoLeaderRebalance(): Unit = {
+    trace("Checking need to trigger auto leader balancing")
+    val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] =
+      controllerContext.partitionReplicaAssignment.filterNot { case (tp, _) =>
+        topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
+      }.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
+    debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers")
+
     // for each broker, check if a preferred replica election needs to be triggered
-    preferredReplicasForTopicsByBrokers.foreach {
-      case(leaderBroker, topicAndPartitionsForBroker) => {
-        var imbalanceRatio: Double = 0
-        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = topicAndPartitionsForBroker
-          .filter { case (topicPartition, _) =>
-          controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
-            controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
-        }
-        debug("topics not in preferred replica " + topicsNotInPreferredReplica)
-        val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
-        val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
-        imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
-        trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
-        // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
-        // that need to be on this broker
-        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-          topicsNotInPreferredReplica.keys.foreach { topicPartition =>
-            // do this check only if the broker is live and there are no partitions being reassigned currently
-            // and preferred replica election is not in progress
-            if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                controllerContext.partitionsBeingReassigned.isEmpty &&
-                !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
-                controllerContext.allTopics.contains(topicPartition.topic)) {
-              onPreferredReplicaElection(Set(topicPartition), true)
-            }
+    preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) =>
+      val topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
+        val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
+        leadershipInfo.map(_.leaderAndIsr.leader != leaderBroker).getOrElse(false)
+      }
+      debug(s"Topics not in preferred replica $topicsNotInPreferredReplica")
+
+      val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicAndPartitionsForBroker.size
+      trace(s"Leader imbalance ratio for broker $leaderBroker is $imbalanceRatio")
+
+      // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
+      // that need to be on this broker
+      if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+        topicsNotInPreferredReplica.keys.foreach { topicPartition =>
+          // do this check only if the broker is live and there are no partitions being reassigned currently
+          // and preferred replica election is not in progress
+          if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+            controllerContext.partitionsBeingReassigned.isEmpty &&
+            !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+            controllerContext.allTopics.contains(topicPartition.topic)) {
+            onPreferredReplicaElection(Set(topicPartition), isTriggeredByAutoRebalance = true)
           }
         }
       }
@@ -1152,98 +1153,80 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     }
   }
 
-  def addToControllerEventQueue(controllerEvent: ControllerEvent): Unit = {
-    controllerEventQueue.put(controllerEvent)
-  }
+  case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
 
-  class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
-    override def doWork(): Unit = {
-      val controllerEvent = controllerEventQueue.take()
-      try {
-        controllerEvent.process()
-      } catch {
-        case e: Throwable => error("Error processing event " + controllerEvent, e)
-      }
-      updateMetrics()
-    }
-  }
+    def state = ControllerState.BrokerChange
 
-  case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
     override def process(): Unit = {
       if (!isActive) return
-      controllerContext.controllerStats.leaderElectionTimer.time {
-        try {
-          val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
-          val curBrokerIds = curBrokers.map(_.id)
-          val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
-          val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
-          val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
-          val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
-          controllerContext.liveBrokers = curBrokers
-          val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
-          val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
-          val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
-          info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
-            .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
-          newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
-          deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
-          if(newBrokerIds.nonEmpty)
-            onBrokerStartup(newBrokerIdsSorted)
-          if(deadBrokerIds.nonEmpty)
-            onBrokerFailure(deadBrokerIdsSorted)
-        } catch {
-          case e: Throwable => error("Error while handling broker changes", e)
-        }
-      }
+      val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
+      val curBrokerIds = curBrokers.map(_.id)
+      val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+      val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
+      val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
+      val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
+      controllerContext.liveBrokers = curBrokers
+      val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
+      val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
+      val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
+      info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
+        .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
+      newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
+      deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
+      if (newBrokerIds.nonEmpty)
+        onBrokerStartup(newBrokerIdsSorted)
+      if (deadBrokerIds.nonEmpty)
+        onBrokerFailure(deadBrokerIdsSorted)
     }
   }
 
   case class TopicChange(topics: Set[String]) extends ControllerEvent {
+
+    def state = ControllerState.TopicChange
+
     override def process(): Unit = {
       if (!isActive) return
-      try {
-        val newTopics = topics -- controllerContext.allTopics
-        val deletedTopics = controllerContext.allTopics -- topics
-        controllerContext.allTopics = topics
-
-        val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
-        controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
-          !deletedTopics.contains(p._1.topic))
-        controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
-        info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
-          deletedTopics, addedPartitionReplicaAssignment))
-        if (newTopics.nonEmpty)
-          onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
-      } catch {
-        case e: Throwable => error("Error while handling new topic", e)
-      }
+      val newTopics = topics -- controllerContext.allTopics
+      val deletedTopics = controllerContext.allTopics -- topics
+      controllerContext.allTopics = topics
+
+      val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
+      controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
+        !deletedTopics.contains(p._1.topic))
+      controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+      info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
+        deletedTopics, addedPartitionReplicaAssignment))
+      if (newTopics.nonEmpty)
+        onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
     }
   }
 
   case class PartitionModifications(topic: String) extends ControllerEvent {
+
+    def state = ControllerState.TopicChange
+
     override def process(): Unit = {
       if (!isActive) return
-      try {
-        val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
-        val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
-          !controllerContext.partitionReplicaAssignment.contains(p._1))
-        if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
-          error("Skipping adding partitions %s for topic %s since it is currently being deleted"
-            .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
-        else {
-          if (partitionsToBeAdded.nonEmpty) {
-            info("New partitions to be added %s".format(partitionsToBeAdded))
-            controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
-            onNewPartitionCreation(partitionsToBeAdded.keySet)
-          }
+      val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
+      val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
+        !controllerContext.partitionReplicaAssignment.contains(p._1))
+      if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+        error("Skipping adding partitions %s for topic %s since it is currently being deleted"
+          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+      else {
+        if (partitionsToBeAdded.nonEmpty) {
+          info("New partitions to be added %s".format(partitionsToBeAdded))
+          controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
+          onNewPartitionCreation(partitionsToBeAdded.keySet)
         }
-      } catch {
-        case e: Throwable => error("Error while handling add partitions for topic " + topic, e)
       }
     }
   }
 
   case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent {
+
+    def state = ControllerState.TopicDeletion
+
     override def process(): Unit = {
       if (!isActive) return
       debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
@@ -1277,6 +1260,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   case class PartitionReassignment(partitionReassignment: Map[TopicAndPartition, Seq[Int]]) extends ControllerEvent {
+
+    def state = ControllerState.PartitionReassignment
+
     override def process(): Unit = {
       if (!isActive) return
       val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
@@ -1290,46 +1276,48 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
           initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
         }
       }
-
     }
+
   }
 
   case class PartitionReassignmentIsrChange(topicAndPartition: TopicAndPartition, reassignedReplicas: Set[Int]) extends ControllerEvent {
+
+    def state = ControllerState.PartitionReassignment
+
     override def process(): Unit = {
       if (!isActive) return
-      try {
         // check if this partition is still being reassigned or not
-        controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
-          case Some(reassignedPartitionContext) =>
-            // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
-            val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
-            newLeaderAndIsrOpt match {
-              case Some(leaderAndIsr) => // check if new replicas have joined ISR
-                val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-                if(caughtUpReplicas == reassignedReplicas) {
-                  // resume the partition reassignment process
-                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                    "Resuming partition reassignment")
-                  onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-                }
-                else {
-                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                    "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
-                }
-              case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
-                .format(topicAndPartition, reassignedReplicas.mkString(",")))
-            }
-          case None =>
-        }
-      } catch {
-        case e: Throwable => error("Error while handling partition reassignment", e)
+      controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
+        case Some(reassignedPartitionContext) =>
+          // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
+          val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
+          newLeaderAndIsrOpt match {
+            case Some(leaderAndIsr) => // check if new replicas have joined ISR
+              val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+              if(caughtUpReplicas == reassignedReplicas) {
+                // resume the partition reassignment process
+                info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                  .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
+                  "Resuming partition reassignment")
+                onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+              }
+              else {
+                info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                  .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
+                  "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+              }
+            case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+              .format(topicAndPartition, reassignedReplicas.mkString(",")))
+          }
+        case None =>
       }
     }
   }
 
   case class IsrChangeNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+
+    def state = ControllerState.IsrChange
+
     override def process(): Unit = {
       if (!isActive) return
       try {
@@ -1381,6 +1369,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent {
+
+    def state = ControllerState.ManualLeaderBalance
+
     override def process(): Unit = {
       if (!isActive) return
       val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
@@ -1390,13 +1381,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
       }
       onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
     }
+
   }
 
   case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
+
+    def state = ControllerState.AutoLeaderBalance
+
     override def process(): Unit = {
       if (!isActive) return
       try {
-        checkAndTriggerPartitionRebalance()
+        checkAndTriggerAutoLeaderRebalance()
       } finally {
         scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
       }
@@ -1404,6 +1399,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
+
+    def state = ControllerState.ControlledShutdown
+
     override def process(): Unit = {
       val controlledShutdownResult = Try { doControlledShutdown(id) }
       controlledShutdownCallback(controlledShutdownResult)
@@ -1473,6 +1471,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
+
+    def state = ControllerState.TopicDeletion
+
     override def process(): Unit = {
       import JavaConverters._
       if (!isActive) return
@@ -1494,57 +1495,70 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   case object Startup extends ControllerEvent {
+
+    def state = ControllerState.ControllerChange
+
     override def process(): Unit = {
       registerSessionExpirationListener()
       registerControllerChangeListener()
       elect()
     }
+
   }
 
   case class ControllerChange(newControllerId: Int) extends ControllerEvent {
+
+    def state = ControllerState.ControllerChange
+
     override def process(): Unit = {
       val wasActiveBeforeChange = isActive
-      activeControllerId.set(newControllerId)
+      activeControllerId = newControllerId
       if (wasActiveBeforeChange && !isActive) {
         onControllerResignation()
       }
     }
+
   }
 
   case object Reelect extends ControllerEvent {
+
+    def state = ControllerState.ControllerChange
+
     override def process(): Unit = {
       val wasActiveBeforeChange = isActive
-      activeControllerId.set(getControllerID())
+      activeControllerId = getControllerID()
       if (wasActiveBeforeChange && !isActive) {
         onControllerResignation()
       }
       elect()
     }
+
   }
 
   private def updateMetrics(): Unit = {
-    val opc = if (!isActive)
-      0
-    else
-      controllerContext.partitionLeadershipInfo.count(p =>
-        !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader) &&
-          !topicDeletionManager.isTopicQueuedUpForDeletion(p._1.topic)
-      )
-    offlinePartitionCount.set(opc)
-
-    val pric = if (!isActive)
-      0
-    else
-      controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
-        controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
-        controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head &&
-          !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
+    offlinePartitionCount =
+      if (!isActive) 0
+      else {
+        controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) =>
+          !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) &&
+            !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
+        }
+      }
+
+    preferredReplicaImbalanceCount =
+      if (!isActive) 0
+      else {
+        controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
+          val preferredReplica = replicas.head
+          val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
+          leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) &&
+            !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
+        }
       }
-    preferredReplicaImbalanceCount.set(pric)
   }
 
   private def triggerControllerMove(): Unit = {
-    activeControllerId.set(-1)
+    activeControllerId = -1
     controllerContext.zkUtils.deletePath(ZkUtils.ControllerPath)
   }
 
@@ -1552,14 +1566,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     val timestamp = time.milliseconds
     val electString = ZkUtils.controllerZkData(config.brokerId, timestamp)
 
-    activeControllerId.set(getControllerID())
+    activeControllerId = getControllerID()
     /*
      * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
      * it's possible that the controller has already been elected when we get here. This check will prevent the following
      * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
      */
-    if(activeControllerId.get() != -1) {
-      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId.get()))
+    if (activeControllerId != -1) {
+      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
       return
     }
 
@@ -1570,15 +1584,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
                                                       controllerContext.zkUtils.isSecure)
       zkCheckedEphemeral.create()
       info(config.brokerId + " successfully elected as the controller")
-      activeControllerId.set(config.brokerId)
+      activeControllerId = config.brokerId
       onControllerFailover()
     } catch {
       case _: ZkNodeExistsException =>
         // If someone else has written the path, then
-        activeControllerId.set(getControllerID)
+        activeControllerId = getControllerID
 
-        if (activeControllerId.get() != -1)
-          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId.get(), config.brokerId))
+        if (activeControllerId != -1)
+          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
         else
           warn("A controller has been elected but just resigned, this will result in another round of election")
 
@@ -1592,23 +1606,23 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
 /**
   * This is the zookeeper listener that triggers all the state transitions for a replica
   */
-class BrokerChangeListener(controller: KafkaController) extends IZkChildListener with Logging {
+class BrokerChangeListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging {
   override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
     import JavaConverters._
-    controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala))
+    eventManager.put(controller.BrokerChange(currentChilds.asScala))
   }
 }
 
-class TopicChangeListener(controller: KafkaController) extends IZkChildListener with Logging {
+class TopicChangeListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging {
   override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
     import JavaConverters._
-    controller.addToControllerEventQueue(controller.TopicChange(currentChilds.asScala.toSet))
+    eventManager.put(controller.TopicChange(currentChilds.asScala.toSet))
   }
 }
 
-class PartitionModificationsListener(controller: KafkaController, topic: String) extends IZkDataListener with Logging {
+class PartitionModificationsListener(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends IZkDataListener with Logging {
   override def handleDataChange(dataPath: String, data: Any): Unit = {
-    controller.addToControllerEventQueue(controller.PartitionModifications(topic))
+    eventManager.put(controller.PartitionModifications(topic))
   }
 
   override def handleDataDeleted(dataPath: String): Unit = {}
@@ -1619,10 +1633,10 @@ class PartitionModificationsListener(controller: KafkaController, topic: String)
   * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
   * 2. If there are topics to be deleted, it signals the delete topic thread
   */
-class TopicDeletionListener(controller: KafkaController) extends IZkChildListener with Logging {
+class TopicDeletionListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging {
   override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
     import JavaConverters._
-    controller.addToControllerEventQueue(controller.TopicDeletion(currentChilds.asScala.toSet))
+    eventManager.put(controller.TopicDeletion(currentChilds.asScala.toSet))
   }
 }
 
@@ -1634,18 +1648,19 @@ class TopicDeletionListener(controller: KafkaController) extends IZkChildListene
  * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
  * partitions.
  */
-class PartitionReassignmentListener(controller: KafkaController) extends IZkDataListener with Logging {
+class PartitionReassignmentListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkDataListener with Logging {
   override def handleDataChange(dataPath: String, data: Any): Unit = {
     val partitionReassignment = ZkUtils.parsePartitionReassignmentData(data.toString)
-    controller.addToControllerEventQueue(controller.PartitionReassignment(partitionReassignment))
+    eventManager.put(controller.PartitionReassignment(partitionReassignment))
   }
 
   override def handleDataDeleted(dataPath: String): Unit = {}
 }
 
-class PartitionReassignmentIsrChangeListener(controller: KafkaController, topic: String, partition: Int, reassignedReplicas: Set[Int]) extends IZkDataListener with Logging {
+class PartitionReassignmentIsrChangeListener(controller: KafkaController, eventManager: ControllerEventManager,
+                                             topic: String, partition: Int, reassignedReplicas: Set[Int]) extends IZkDataListener with Logging {
   override def handleDataChange(dataPath: String, data: Any): Unit = {
-    controller.addToControllerEventQueue(controller.PartitionReassignmentIsrChange(TopicAndPartition(topic, partition), reassignedReplicas))
+    eventManager.put(controller.PartitionReassignmentIsrChange(TopicAndPartition(topic, partition), reassignedReplicas))
   }
 
   override def handleDataDeleted(dataPath: String): Unit = {}
@@ -1654,10 +1669,10 @@ class PartitionReassignmentIsrChangeListener(controller: KafkaController, topic:
 /**
  * Called when replica leader initiates isr change
  */
-class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
+class IsrChangeNotificationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging {
   override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
     import JavaConverters._
-    controller.addToControllerEventQueue(controller.IsrChangeNotification(currentChilds.asScala))
+    eventManager.put(controller.IsrChangeNotification(currentChilds.asScala))
   }
 }
 
@@ -1669,26 +1684,26 @@ object IsrChangeNotificationListener {
  * Starts the preferred replica leader election for the list of partitions specified under
  * /admin/preferred_replica_election -
  */
-class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
+class PreferredReplicaElectionListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkDataListener with Logging {
   override def handleDataChange(dataPath: String, data: Any): Unit = {
     val partitions = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
-    controller.addToControllerEventQueue(controller.PreferredReplicaLeaderElection(partitions))
+    eventManager.put(controller.PreferredReplicaLeaderElection(partitions))
   }
 
   override def handleDataDeleted(dataPath: String): Unit = {}
 }
 
-class ControllerChangeListener(controller: KafkaController) extends IZkDataListener {
+class ControllerChangeListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkDataListener {
   override def handleDataChange(dataPath: String, data: Any): Unit = {
-    controller.addToControllerEventQueue(controller.ControllerChange(KafkaController.parseControllerId(data.toString)))
+    eventManager.put(controller.ControllerChange(KafkaController.parseControllerId(data.toString)))
   }
 
   override def handleDataDeleted(dataPath: String): Unit = {
-    controller.addToControllerEventQueue(controller.Reelect)
+    eventManager.put(controller.Reelect)
   }
 }
 
-class SessionExpirationListener(controller: KafkaController) extends IZkStateListener with Logging {
+class SessionExpirationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkStateListener with Logging {
   override def handleStateChanged(state: KeeperState) {
     // do nothing, since zkclient will do reconnect for us.
   }
@@ -1701,7 +1716,7 @@ class SessionExpirationListener(controller: KafkaController) extends IZkStateLis
     */
   @throws[Exception]
   override def handleNewSession(): Unit = {
-    controller.addToControllerEventQueue(controller.Reelect)
+    eventManager.put(controller.Reelect)
   }
 
   override def handleSessionEstablishmentError(error: Throwable): Unit = {
@@ -1731,9 +1746,16 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo
 
 private[controller] class ControllerStats extends KafkaMetricsGroup {
   val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
-  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+
+  val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>
+    state.rateAndTimeMetricName.map { metricName =>
+      state -> new KafkaTimer(newTimer(s"$metricName", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+    }
+  }.toMap
+
 }
 
 sealed trait ControllerEvent {
+  def state: ControllerState
   def process(): Unit
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 42db26b..54bbb89 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -74,7 +74,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
               throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live " +
                 s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are: [$assignedReplicas].")
             } else {
-              controllerContext.controllerStats.uncleanLeaderElectionRate.mark()
+              controllerContext.stats.uncleanLeaderElectionRate.mark()
               val newLeader = liveAssignedReplicas.head
               warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live " +
                 s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index b1f98b5..4920a6b 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -57,7 +57,7 @@ import scala.collection.{Set, mutable}
  *    it marks the topic for deletion retry.
  * @param controller
  */
-class TopicDeletionManager(controller: KafkaController) extends Logging {
+class TopicDeletionManager(controller: KafkaController, eventManager: ControllerEventManager) extends Logging {
   this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
   val controllerContext = controller.controllerContext
   val partitionStateMachine = controller.partitionStateMachine
@@ -298,8 +298,9 @@ class TopicDeletionManager(controller: KafkaController) extends Logging {
       replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
       debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
-        new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) => controller.addToControllerEventQueue(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build)
-      if(deadReplicasForTopic.nonEmpty) {
+        new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
+          eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build)
+      if (deadReplicasForTopic.nonEmpty) {
         debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
         markTopicIneligibleForDeletion(Set(topic))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/metrics/KafkaTimer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaTimer.scala b/core/src/main/scala/kafka/metrics/KafkaTimer.scala
index 7f76f2d..24b54d6 100644
--- a/core/src/main/scala/kafka/metrics/KafkaTimer.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaTimer.scala
@@ -28,12 +28,8 @@ class KafkaTimer(metric: Timer) {
 
   def time[A](f: => A): A = {
     val ctx = metric.time
-    try {
-      f
-    }
-    finally {
-      ctx.stop()
-    }
+    try f
+    finally ctx.stop()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7310a06..fedbafb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -171,8 +171,9 @@ class ReplicaManager(val config: KafkaConfig,
       def value = underReplicatedPartitionCount
     }
   )
-  val isrExpandRate = newMeter("IsrExpandsPerSec",  "expands", TimeUnit.SECONDS)
-  val isrShrinkRate = newMeter("IsrShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
+  val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
+  val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
+  val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
 
   def underReplicatedPartitionCount: Int =
     getLeaderPartitions.count(_.isUnderReplicated)

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
new file mode 100644
index 0000000..fccb566
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Timer
+import kafka.utils.TestUtils
+import org.easymock.{EasyMock, IAnswer}
+import org.junit.Test
+import org.junit.Assert.{assertEquals, fail}
+
+import scala.collection.JavaConverters._
+
+class ControllerEventManagerTest {
+
+  private var controllerEventManager: ControllerEventManager = _
+
+  def tearDown(): Unit = {
+    if (controllerEventManager != null)
+      controllerEventManager.close()
+  }
+
+  @Test
+  def testSuccessfulEvent(): Unit = {
+    check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs", ControllerState.AutoLeaderBalance,
+      () => Unit)
+  }
+
+  @Test
+  def testEventThatThrowsException(): Unit = {
+    check("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs", ControllerState.BrokerChange,
+      () => throw new NullPointerException)
+  }
+
+  private def check(metricName: String, controllerState: ControllerState, process: () => Unit): Unit = {
+    val controllerStats = new ControllerStats
+    val eventProcessedListenerCount = new AtomicInteger
+    controllerEventManager = new ControllerEventManager(controllerStats.rateAndTimeMetrics,
+      _ => eventProcessedListenerCount.incrementAndGet)
+    controllerEventManager.start()
+
+    val initialTimerCount = timer(metricName).count
+
+    // `ControllerEvent` is sealed so we use EasyMock to create a subclass
+    val eventMock = EasyMock.createMock(classOf[ControllerEvent])
+    EasyMock.expect(eventMock.state).andReturn(controllerState)
+
+    // Only return from `process()` once we have checked `controllerEventManager.state`
+    val latch = new CountDownLatch(1)
+    EasyMock.expect(eventMock.process()).andAnswer(new IAnswer[Unit]() {
+      def answer(): Unit = {
+        latch.await()
+        process()
+      }
+    })
+
+    EasyMock.replay(eventMock)
+
+    controllerEventManager.put(eventMock)
+    TestUtils.waitUntilTrue(() => controllerEventManager.state == controllerState,
+      s"Controller state is not $controllerState")
+    latch.countDown()
+
+    TestUtils.waitUntilTrue(() => controllerEventManager.state == ControllerState.Idle,
+      "Controller state has not changed back to Idle")
+    assertEquals(1, eventProcessedListenerCount.get)
+
+    assertEquals("Timer has not been updated", initialTimerCount + 1, timer(metricName).count)
+  }
+
+  private def timer(metricName: String): Timer = {
+    Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption
+      .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Timer]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 1837ba2..2df93c7 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -17,12 +17,17 @@
 
 package kafka.controller
 
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Timer
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.{After, Before, Test}
+import org.junit.Assert.assertTrue
+
+import scala.collection.JavaConverters._
 
 class ControllerIntegrationTest extends ZooKeeperTestHarness {
   var servers = Seq.empty[KafkaServer]
@@ -129,6 +134,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   def testPartitionReassignment(): Unit = {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+
+    val metricName = s"kafka.controller:type=ControllerStats,name=${ControllerState.PartitionReassignment.rateAndTimeMetricName.get}"
+    val timerCount = timer(metricName).count
+
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
@@ -141,6 +150,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
       "failed to get updated partition assignment on topic znode after partition reassignment")
     TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
       "failed to remove reassign partitions path after completion")
+
+    val updatedTimerCount = timer(metricName).count
+    assertTrue(s"Timer count $updatedTimerCount should be greater than $timerCount", updatedTimerCount > timerCount)
   }
 
   @Test
@@ -312,4 +324,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     }
     configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config)))
   }
-}
\ No newline at end of file
+
+  private def timer(metricName: String): Timer = {
+    Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption
+      .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Timer]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 19a0f9d..60a6fb6 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -56,13 +56,9 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
     // Do some Metrics Registry cleanup by removing the metrics that this test checks. 
     // This is a test workaround to the issue that prior harness runs may have left a populated registry.
     // see https://issues.apache.org/jira/browse/KAFKA-4605
-    for (m <- (testedMetrics)) {
-        Metrics.defaultRegistry.allMetrics.asScala
-        .filterKeys(k => k.getName.endsWith(m))
-        .headOption match {
-           case Some(e) => Metrics.defaultRegistry.removeMetric(e._1)
-           case None =>
-        }
+    for (m <- testedMetrics) {
+        val metricName = Metrics.defaultRegistry.allMetrics.asScala.keys.find(_.getName.endsWith(m))
+        metricName.foreach(Metrics.defaultRegistry.removeMetric)
     }
     
     super.setUp


Mime
View raw message