kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: MINOR: onControllerResignation should be invoked if triggerControllerMove is called
Date Tue, 30 May 2017 23:59:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b38b74bb7 -> 6021618f9


MINOR: onControllerResignation should be invoked if triggerControllerMove is called

Also update the test to be simpler since we can use a mock event to simulate the issue
more easily (thanks Jun for the suggestion). This should fix two issues:

1. A transient test failure due to a NPE in ControllerFailoverTest.testMetadataUpdate:

```text
Caused by: java.lang.NullPointerException
	at kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:338)
	at kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:975)
	at kafka.controller.ControllerFailoverTest.testMetadataUpdate(ControllerFailoverTest.scala:141)
```

The test was creating an additional thread and it does not seem like it was doing the
appropriate synchronization (perhaps this became more of an issue after we changed
the Controller to be single-threaded and changed the locking)

2. Setting `activeControllerId.set(-1)` in `triggerControllerMove` causes `Reelect` not to
invoke `onControllerResignation`. Among other things, this causes an `IllegalStateException`
to be thrown when `KafkaScheduler.startup` is invoked for the second time without the corresponding
`shutdown`. We now simply call `onControllerResignation` as part of `triggerControllerMove`.

Finally, I included a few clean-ups:

1. No longer update the broker state in `onControllerFailover`. This is no longer needed
since we removed the `RunningAsController` state (KAFKA-3761).
2. Trivial clean-ups in KafkaController
3. Removed unused parameter in `ZkUtils.getPartitionLeaderAndIsrForTopics`

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2935 from ijuma/on-controller-resignation-if-trigger-controller-move


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6021618f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6021618f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6021618f

Branch: refs/heads/trunk
Commit: 6021618f9dafa3478104575d307e7bcd2cb4cca9
Parents: b38b74b
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue May 30 16:59:33 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue May 30 16:59:33 2017 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   |   4 +-
 .../kafka/controller/KafkaController.scala      | 113 +++++++-------
 .../controller/PartitionStateMachine.scala      |   6 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   4 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   3 +-
 .../controller/ControllerEventManagerTest.scala |  15 +-
 .../controller/ControllerFailoverTest.scala     | 148 +++++--------------
 .../controller/ControllerIntegrationTest.scala  |   6 +-
 .../kafka/controller/ControllerTestUtils.scala  |  35 +++++
 10 files changed, 137 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ea8d13b..8f98a8c 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -381,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
     try {
       leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
         partitionStateInfos.foreach { case (topicPartition, state) =>
-          val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader)
"become-leader" else "become-follower"
+          val typeOfRequest =
+            if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader"
+            else "become-follower"
           stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request
%s to broker %d " +
                                    "for partition [%s,%d]").format(controllerId, controllerEpoch,
typeOfRequest,
                                                                    state.leaderIsrAndControllerEpoch,
broker,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index dbce485..956d1ca 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -146,18 +146,20 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState,
time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with
KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, metrics: Metrics,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private val stateChangeLogger = KafkaController.stateChangeLogger
   val controllerContext = new ControllerContext(zkUtils)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
 
-  // have a separate scheduler for the controller to be able to start and stop independently
of the
-  // kafka server
-  private val kafkaScheduler = new KafkaScheduler(1)
+  // have a separate scheduler for the controller to be able to start and stop independently
of the kafka server
+  // visible for testing
+  private[controller] val kafkaScheduler = new KafkaScheduler(1)
 
-  private val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
_ => updateMetrics())
+  // visible for testing
+  private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
+    _ => updateMetrics())
 
   val topicDeletionManager = new TopicDeletionManager(this, eventManager)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
@@ -290,7 +292,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   /**
    * This callback is invoked by the zookeeper leader elector when the current broker resigns
as the controller. This is
    * required to clean up internal controller data structures
-   * Note:We need to resign as a controller out of the controller lock to avoid potential
deadlock issue
    */
   def onControllerResignation() {
     debug("Controller resigning, broker id %d".format(config.brokerId))
@@ -318,9 +319,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     replicaStateMachine.shutdown()
     deregisterBrokerChangeListener()
 
-    // reset controller context
     resetControllerContext()
-    brokerState.newState(RunningAsBroker)
 
     info("Broker %d resigned as the controller".format(config.brokerId))
   }
@@ -746,18 +745,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
   }
 
   def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet)
{
-    val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
-    for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
+    val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions)
+    for ((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
       controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
   }
 
   private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean
= {
-    zkUtils.getLeaderAndIsrForPartition(topic, partition) match {
-      case Some(leaderAndIsr) =>
-        val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
-        replicasNotInIsr.isEmpty
-      case None => false
-    }
+    zkUtils.getLeaderAndIsrForPartition(topic, partition).map { leaderAndIsr =>
+      replicas.forall(leaderAndIsr.isr.contains)
+    }.getOrElse(false)
   }
 
   private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
@@ -824,22 +820,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
   }
 
   private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest:
Seq[Int], newAssignedReplicas: Seq[Int]) {
-    brokerRequestBatch.newBatch()
     updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
+          brokerRequestBatch.newBatch()
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
             topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
           brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
         } catch {
-          case e : IllegalStateException => {
-            // Resign if the controller is in an illegal state
-            error("Forcing the controller to resign")
-            brokerRequestBatch.clear()
-            triggerControllerMove()
-
-            throw e
-          }
+          case e: IllegalStateException =>
+            handleIllegalState(e)
         }
         stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with
new assigned replica list %s " +
           "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch,
updatedLeaderIsrAndControllerEpoch,
@@ -986,14 +976,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
       brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
       brokerRequestBatch.sendRequestsToBrokers(epoch)
     } catch {
-      case e : IllegalStateException => {
-        // Resign if the controller is in an illegal state
-        error("Forcing the controller to resign")
-        brokerRequestBatch.clear()
-        triggerControllerMove()
-
-        throw e
-      }
+      case e: IllegalStateException =>
+        handleIllegalState(e)
     }
   }
 
@@ -1425,39 +1409,32 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
           controllerContext.partitionsOnBroker(id)
             .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
 
-      allPartitionsAndReplicationFactorOnBroker.foreach {
-        case(topicAndPartition, replicationFactor) =>
-          controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch
=>
-            if (replicationFactor > 1) {
-              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
-                // If the broker leads the topic partition, transition the leader and update
isr. Updates zk and
-                // notifies all affected brokers
-                partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                  controlledShutdownPartitionLeaderSelector)
-              } else {
-                // Stop the replica first. The state change below initiates ZK changes which
should take some time
-                // before which the stop replica request should be completed (in most cases)
-                try {
-                  brokerRequestBatch.newBatch()
-                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
-                    topicAndPartition.partition, deletePartition = false)
-                  brokerRequestBatch.sendRequestsToBrokers(epoch)
-                } catch {
-                  case e : IllegalStateException => {
-                    // Resign if the controller is in an illegal state
-                    error("Forcing the controller to resign")
-                    brokerRequestBatch.clear()
-                    triggerControllerMove()
-
-                    throw e
-                  }
-                }
-                // If the broker is a follower, updates the isr in ZK and notifies the current
leader
-                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                  topicAndPartition.partition, id)), OfflineReplica)
+      allPartitionsAndReplicationFactorOnBroker.foreach { case (topicAndPartition, replicationFactor)
=>
+        controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch
=>
+          if (replicationFactor > 1) {
+            if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
+              // If the broker leads the topic partition, transition the leader and update
isr. Updates zk and
+              // notifies all affected brokers
+              partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
+                controlledShutdownPartitionLeaderSelector)
+            } else {
+              // Stop the replica first. The state change below initiates ZK changes which
should take some time
+              // before which the stop replica request should be completed (in most cases)
+              try {
+                brokerRequestBatch.newBatch()
+                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
+                  topicAndPartition.partition, deletePartition = false)
+                brokerRequestBatch.sendRequestsToBrokers(epoch)
+              } catch {
+                case e: IllegalStateException =>
+                  handleIllegalState(e)
               }
+              // If the broker is a follower, updates the isr in ZK and notifies the current
leader
+              replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+                topicAndPartition.partition, id)), OfflineReplica)
             }
           }
+        }
       }
       def replicatedPartitionsBrokerLeads() = {
         trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
@@ -1557,7 +1534,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val
brokerState
       }
   }
 
+  // visible for testing
+  private[controller] def handleIllegalState(e: IllegalStateException): Nothing = {
+    // Resign if the controller is in an illegal state
+    error("Forcing the controller to resign")
+    brokerRequestBatch.clear()
+    triggerControllerMove()
+    throw e
+  }
+
   private def triggerControllerMove(): Unit = {
+    onControllerResignation()
     activeControllerId = -1
     controllerContext.zkUtils.deletePath(ZkUtils.ControllerPath)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 4cffc13..5751e17 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -79,9 +79,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
       brokerRequestBatch.newBatch()
       // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition
state except partitions
       // that belong to topics to be deleted
-      for((topicAndPartition, partitionState) <- partitionState
+      for ((topicAndPartition, partitionState) <- partitionState
           if !controller.topicDeletionManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))
{
-        if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
+        if (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
           handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition,
controller.offlinePartitionSelector,
                             (new CallbackBuilder).build)
       }
@@ -111,7 +111,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState,
leaderSelector, callbacks)
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-    }catch {
+    } catch {
       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState),
e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger
state changes for those partitions
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 0759ed4..60b9990 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -81,13 +81,13 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
    */
   def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
                          callbacks: Callbacks = (new CallbackBuilder).build) {
-    if(replicas.nonEmpty) {
+    if (replicas.nonEmpty) {
       info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
       try {
         brokerRequestBatch.newBatch()
         replicas.foreach(r => handleStateChange(r, targetState, callbacks))
         brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-      }catch {
+      } catch {
         case e: Throwable => error("Error while moving some replicas to %s state".format(targetState),
e)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c7dac0d..0a87750 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -226,7 +226,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics,
threadNamePrefix)
+        kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index fc78501..ac497c4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -701,8 +701,7 @@ class ZkUtils(val zkClient: ZkClient,
     cluster
   }
 
-  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition])
-  : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+  def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition,
LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     for(topicAndPartition <- topicAndPartitions) {
       ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition)
match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index 727c4f3..ec9343e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Timer
 import kafka.utils.TestUtils
-import org.easymock.{EasyMock, IAnswer}
 import org.junit.{After, Test}
 import org.junit.Assert.{assertEquals, fail}
 
@@ -60,21 +59,13 @@ class ControllerEventManagerTest {
 
     val initialTimerCount = timer(metricName).count
 
-    // `ControllerEvent` is sealed so we use EasyMock to create a subclass
-    val eventMock = EasyMock.createMock(classOf[ControllerEvent])
-    EasyMock.expect(eventMock.state).andReturn(controllerState)
-
     // Only return from `process()` once we have checked `controllerEventManager.state`
     val latch = new CountDownLatch(1)
-    EasyMock.expect(eventMock.process()).andAnswer(new IAnswer[Unit]() {
-      def answer(): Unit = {
-        latch.await()
-        process()
-      }
+    val eventMock = ControllerTestUtils.createMockControllerEvent(controllerState, { () =>
+      latch.await()
+      process()
     })
 
-    EasyMock.replay(eventMock)
-
     controllerEventManager.put(eventMock)
     TestUtils.waitUntilTrue(() => controllerEventManager.state == controllerState,
       s"Controller state is not $controllerState")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 83a315f..7a91bef 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -18,18 +18,17 @@
 package kafka.controller
 
 import java.util.Properties
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.CountDownLatch
 
+import kafka.admin.AdminUtils
 import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.Time
-import org.apache.log4j.{Level, Logger}
-import org.junit.{After, Ignore, Test}
-
-import scala.collection.mutable
+import org.apache.log4j.Logger
+import org.junit.{After, Test}
+import org.junit.Assert._
 
 class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
   val log = Logger.getLogger(classOf[ControllerFailoverTest])
@@ -54,119 +53,44 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging
{
    * See @link{https://issues.apache.org/jira/browse/KAFKA-2300}
    * for the background of this test case
    */
-  @Ignore // This needs to be reworked as described here: https://github.com/apache/kafka/pull/2935#discussion_r114374412
   @Test
-  def testMetadataUpdate() {
-    log.setLevel(Level.INFO)
-    var controller: KafkaServer = this.servers.head
-    // Find the current controller
-    val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
-    for (server <- this.servers) {
-      epochMap += (server.config.brokerId -> server.kafkaController.epoch)
-      if(server.kafkaController.isActive) {
-        controller = server
-      }
+  def testHandleIllegalStateException() {
+    val initialController = servers.find(_.kafkaController.isActive).map(_.kafkaController).getOrElse
{
+      fail("Could not find controller")
     }
+    val initialEpoch = initialController.epoch
     // Create topic with one partition
-    kafka.admin.AdminUtils.createTopic(controller.zkUtils, topic, 1, 1)
+    AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
     val topicPartition = TopicAndPartition("topic1", 0)
-    var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
-    while (!partitions.contains(topicPartition)) {
-      partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
-      Thread.sleep(100)
-    }
-    // Replace channel manager with our mock manager
-    controller.kafkaController.controllerContext.controllerChannelManager.shutdown()
-    val channelManager = new MockChannelManager(controller.kafkaController.controllerContext,

-                                                  controller.kafkaController.config, metrics)
-    channelManager.startup()
-    controller.kafkaController.controllerContext.controllerChannelManager = channelManager
-    channelManager.shrinkBlockingQueue(0)
-    channelManager.stopSendThread(0)
-    // Spawn a new thread to block on the outgoing channel
-    // queue
-    val thread = new Thread(new Runnable {
-      def run() {
-        try {
-          controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
-          log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
-          controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
-          log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
-        } catch {
-          case _: Exception => log.info("Thread interrupted")
-        }
+    TestUtils.waitUntilTrue(() =>
+      initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
+      s"Partition $topicPartition did not transition to online state")
+
+    // Wait until we have verified that we have resigned
+    val latch = new CountDownLatch(1)
+    @volatile var expectedExceptionThrown = false
+    @volatile var unexpectedExceptionThrown: Option[Throwable] = None
+    val illegalStateEvent = ControllerTestUtils.createMockControllerEvent(ControllerState.BrokerChange,
{ () =>
+      try initialController.handleIllegalState(new IllegalStateException("Thrown for test
purposes"))
+      catch {
+        case _: IllegalStateException => expectedExceptionThrown = true
+        case t: Throwable => unexpectedExceptionThrown = Some(t)
       }
+      latch.await()
     })
-    thread.setName("mythread")
-    thread.start()
-    while (thread.getState() != Thread.State.WAITING) {
-      Thread.sleep(100)
-    }
-    // Assume that the thread is WAITING because it is
-    // blocked on the queue, so interrupt and move forward
-    thread.interrupt()
-    thread.join()
-    channelManager.resumeSendThread(0)
-    // Wait and find current controller
-    var found = false
-    var counter = 0
-    while (!found && counter < 10) {
-      for (server <- this.servers) {
-        val previousEpoch = epochMap get server.config.brokerId match {
-          case Some(epoch) =>
-            epoch
-          case None =>
-            val msg = String.format("Missing element in epoch map %s", epochMap.mkString(",
"))
-            throw new IllegalStateException(msg)
-        }
+    initialController.eventManager.put(illegalStateEvent)
+    // Check that we have shutdown the scheduler (via onControllerResigned)
+    TestUtils.waitUntilTrue(() => !initialController.kafkaScheduler.isStarted, "Scheduler
was not shutdown")
+    TestUtils.waitUntilTrue(() => !initialController.isActive, "Controller did not become
inactive")
+    latch.countDown()
+    assertTrue("IllegalStateException was not thrown", expectedExceptionThrown)
+    assertEquals("Unexpected exception thrown", None, unexpectedExceptionThrown)
 
-        if (server.kafkaController.isActive
-            && previousEpoch < server.kafkaController.epoch) {
-          controller = server
-          found = true
-        }
+    TestUtils.waitUntilTrue(() => {
+      servers.exists { server =>
+        server.kafkaController.isActive && server.kafkaController.epoch > initialController.epoch
       }
-      if (!found) {
-          Thread.sleep(100)
-          counter += 1
-      }
-    }
-    // Give it a shot to make sure that sending isn't blocking
-    try {
-      controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
-    } catch {
-      case e : Throwable => {
-        fail(e)
-      }
-    }
-  }
-}
-
-class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig,
metrics: Metrics)
-  extends ControllerChannelManager(controllerContext, config, Time.SYSTEM, metrics) {
-
-  def stopSendThread(brokerId: Int) {
-    val requestThread = brokerStateInfo(brokerId).requestSendThread
-    requestThread.isRunning.set(false)
-    requestThread.interrupt
-    requestThread.join
-  }
-
-  def shrinkBlockingQueue(brokerId: Int) {
-    val messageQueue = new LinkedBlockingQueue[QueueItem](1)
-    val brokerInfo = this.brokerStateInfo(brokerId)
-    this.brokerStateInfo.put(brokerId, brokerInfo.copy(messageQueue = messageQueue))
-  }
-
-  def resumeSendThread (brokerId: Int) {
-    this.startRequestSendThread(0)
-  }
-
-  def queueCapacity(brokerId: Int): Int = {
-    this.brokerStateInfo(brokerId).messageQueue.remainingCapacity
-  }
+    }, "Failed to find controller")
 
-  def queueSize(brokerId: Int): Int = {
-    this.brokerStateInfo(brokerId).messageQueue.size
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 2df93c7..d5f2fe0 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -264,7 +264,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null,
Set(tp))
+      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch,
LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
@@ -284,7 +284,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers(1).shutdown()
     servers(1).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null,
Set(tp))
+      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch,
LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List.empty
@@ -301,7 +301,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
                                     leaderEpoch: Int,
                                     message: String): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null,
Set(tp))
+      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), controllerEpoch, leader,
leaderEpoch)
     }, message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6021618f/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
new file mode 100644
index 0000000..407297a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import org.easymock.{EasyMock, IAnswer}
+
+object ControllerTestUtils {
+
+  /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with
EasyMock */
+  def createMockControllerEvent(controllerState: ControllerState, process: () => Unit):
ControllerEvent = {
+    val mockEvent = EasyMock.createMock(classOf[ControllerEvent])
+    EasyMock.expect(mockEvent.state).andReturn(controllerState)
+    EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
+      def answer(): Unit = {
+        process()
+      }
+    })
+    EasyMock.replay(mockEvent)
+    mockEvent
+  }
+}


Mime
View raw message