kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller
Date Wed, 18 Oct 2017 16:15:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 68f324f4b -> b71ee043f


http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
new file mode 100644
index 0000000..1214344
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.log.LogConfig
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection.mutable
+
+class PartitionStateMachineTest extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
+  private var mockTopicDeletionManager: TopicDeletionManager = null
+  private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = null
+  private var partitionStateMachine: PartitionStateMachine = null
+
+  private val brokerId = 5
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
+  private val controllerEpoch = 50
+  private val partition = TopicAndPartition("t", 0)
+  private val partitions = Seq(partition)
+
+  @Before
+  def setUp(): Unit = {
+    controllerContext = new ControllerContext
+    controllerContext.epoch = controllerEpoch
+    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
+    mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
+    partitionState = mutable.Map.empty[TopicAndPartition, PartitionState]
+    partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId,
true, None), controllerContext, mockTopicDeletionManager,
+      mockZkUtils, partitionState, mockControllerBrokerRequestBatch)
+  }
+
+  @Test
+  def testNonexistentPartitionToNewPartitionTransition(): Unit = {
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidNonexistentPartitionToOnlinePartitionTransition(): Unit = {
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    assertEquals(NonExistentPartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidNonexistentPartitionToOfflinePartitionTransition(): Unit = {
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(NonExistentPartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOnlinePartitionTransition(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, NewPartition)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+      .andReturn(Seq(CreateResponse(Code.OK.intValue(), null, partition, null)))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew
= true))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates(): Unit
= {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, NewPartition)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+      .andThrow(new ZookeeperClientException("test"))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, NewPartition)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+      .andReturn(Seq(CreateResponse(Code.NODEEXISTS.intValue(), null, partition, null)))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOfflinePartitionTransition(): Unit = {
+    partitionState.put(partition, NewPartition)
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidNewPartitionToNonexistentPartitionTransition(): Unit = {
+    partitionState.put(partition, NewPartition)
+    partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testOnlinePartitionToOnlineTransition(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OnlinePartition)
+    val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+
+    val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
+    val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection),
controllerEpoch))
+      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch),
+      Seq(brokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOnlinePartitionToOnlineTransitionForControlledShutdown(): Unit = {
+    val otherBrokerId = brokerId + 1
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0), TestUtils.createBroker(otherBrokerId,
"host", 0))
+    controllerContext.shuttingDownBrokerIds.add(brokerId)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId, otherBrokerId))
+    partitionState.put(partition, OnlinePartition)
+    val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId, otherBrokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+
+    val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
+    val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection),
controllerEpoch))
+      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
+      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch),
+      Seq(brokerId, otherBrokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOnlinePartitionToOfflineTransition(): Unit = {
+    partitionState.put(partition, OnlinePartition)
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidOnlinePartitionToNonexistentPartitionTransition(): Unit = {
+    partitionState.put(partition, OnlinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidOnlinePartitionToNewPartitionTransition(): Unit = {
+    partitionState.put(partition, OnlinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToOnlinePartitionTransition(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OfflinePartition)
+    val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition, TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch),
stat)))
+
+    EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals()))
+      .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
+    val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
+    val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection),
controllerEpoch))
+      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch), Seq(brokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup(): Unit
= {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OfflinePartition)
+    val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andThrow(new ZookeeperClientException(""))
+
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OfflinePartition)
+    val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.NONODE.intValue(), null, partition, TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch),
stat)))
+
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToNonexistentPartitionTransition(): Unit = {
+    partitionState.put(partition, OfflinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
+    assertEquals(NonExistentPartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidOfflinePartitionToNewPartitionTransition(): Unit = {
+    partitionState.put(partition, OfflinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
new file mode 100644
index 0000000..62c28a0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection.mutable
+
+class ReplicaStateMachineTest extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
+  private var mockTopicDeletionManager: TopicDeletionManager = null
+  private var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = null
+  private var replicaStateMachine: ReplicaStateMachine = null
+
+  private val brokerId = 5
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
+  private val controllerEpoch = 50
+  private val partition = TopicAndPartition("t", 0)
+  private val partitions = Seq(partition)
+  private val replica = PartitionAndReplica(partition.topic, partition.partition, brokerId)
+  private val replicas = Seq(replica)
+
+  @Before
+  def setUp(): Unit = {
+    controllerContext = new ControllerContext
+    controllerContext.epoch = controllerEpoch
+    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
+    mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
+    replicaState = mutable.Map.empty[PartitionAndReplica, ReplicaState]
+    replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId,
true, None), controllerContext, mockTopicDeletionManager, mockZkUtils,
+      replicaState, mockControllerBrokerRequestBatch)
+  }
+
+  @Test
+  def testNonexistentReplicaToNewReplicaTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, NewReplica)
+    assertEquals(NewReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToOnlineReplicaTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToOfflineReplicaTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToReplicaDeletionStartedTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionIneligible)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionSuccessful)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNewReplicaToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(NewReplica, NonExistentReplica)
+  }
+
+  @Test
+  def testNewReplicaToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, NewReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testNewReplicaToOfflineReplicaTransition(): Unit = {
+    replicaState.put(replica, NewReplica)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, false, null))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
+    EasyMock.verify(mockControllerBrokerRequestBatch)
+    assertEquals(NewReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNewReplicaToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(NewReplica, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidNewReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    testInvalidTransition(NewReplica, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidNewReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    testInvalidTransition(NewReplica, ReplicaDeletionSuccessful)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToNewReplicaTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, NewReplica)
+  }
+
+  @Test
+  def testOnlineReplicaToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, OnlineReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew
= false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testOnlineReplicaToOfflineReplicaTransition(): Unit = {
+    val otherBrokerId = brokerId + 1
+    val replicaIds = List(brokerId, otherBrokerId)
+    replicaState.put(replica, OnlineReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, replicaIds)
+    val leaderAndIsr = LeaderAndIsr(brokerId, replicaIds)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, false, null))
+    val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
+    val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion
+ 1)
+    val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr,
controllerEpoch)
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr),
controllerEpoch))
+      .andReturn(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)
+    EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
+      partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds,
isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    assertEquals(updatedLeaderIsrAndControllerEpoch, controllerContext.partitionLeadershipInfo(partition))
+    assertEquals(OfflineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, ReplicaDeletionSuccessful)
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToNewReplicaTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, NewReplica)
+  }
+
+  @Test
+  def testOfflineReplicaToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, OfflineReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew
= false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testOfflineReplicaToReplicaDeletionStartedTransition(): Unit = {
+    val callbacks = (new Callbacks.CallbackBuilder).build
+    replicaState.put(replica, OfflineReplica)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, true, callbacks.stopReplicaResponseCallback))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(ReplicaDeletionStarted, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, ReplicaDeletionSuccessful)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToNewReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, NewReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToOnlineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, OnlineReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToOfflineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, OfflineReplica)
+  }
+
+  @Test
+  def testReplicaDeletionStartedToReplicaDeletionIneligibleTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionStarted)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionIneligible)
+    assertEquals(ReplicaDeletionIneligible, replicaState(replica))
+  }
+
+  @Test
+  def testReplicaDeletionStartedToReplicaDeletionSuccessfulTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionStarted)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionSuccessful)
+    assertEquals(ReplicaDeletionSuccessful, replicaState(replica))
+  }
+
+  @Test
+  def testReplicaDeletionSuccessfulToNonexistentReplicaTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionSuccessful)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    replicaStateMachine.handleStateChanges(replicas, NonExistentReplica)
+    assertEquals(Seq.empty, controllerContext.partitionReplicaAssignment(partition))
+    assertEquals(None, replicaState.get(replica))
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToNewReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, NewReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToOnlineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, OnlineReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToOfflineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, OfflineReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToReplicaDeletionIneligibleTransition(): Unit =
{
+    testInvalidTransition(ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionIneligible, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToNewReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionIneligible, NewReplica)
+  }
+
+  @Test
+  def testReplicaDeletionIneligibleToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionIneligible)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId,
List(brokerId)), controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew
= false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionIneligible, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToReplicaDeletionSuccessfulTransition(): Unit =
{
+    testInvalidTransition(ReplicaDeletionIneligible, ReplicaDeletionSuccessful)
+  }
+
+  private def testInvalidTransition(fromState: ReplicaState, toState: ReplicaState): Unit
= {
+    replicaState.put(replica, fromState)
+    replicaStateMachine.handleStateChanges(replicas, toState)
+    assertEquals(fromState, replicaState(replica))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
index 3f80c28..9f172f0 100644
--- a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
@@ -229,17 +229,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation = {
+      override def handleCreation(): Unit = {
         znodeChangeHandlerCountDownLatch.countDown()
       }
-      override def handleDeletion = {}
-      override def handleDataChange = {}
       override val path: String = mockPath
     }
 
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val existsRequest = ExistsRequest(mockPath, null)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT, null)
+    val responses = zookeeperClient.handle(Seq(existsRequest, createRequest))
+    assertEquals("Response code for exists should be NONODE", Code.NONODE, Code.get(responses.head.rc))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc))
     assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
   }
 
@@ -249,17 +250,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation = {}
-      override def handleDeletion = {
+      override def handleDeletion(): Unit = {
         znodeChangeHandlerCountDownLatch.countDown()
       }
-      override def handleDataChange = {}
       override val path: String = mockPath
     }
 
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath, null)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT, null)
+    val responses = zookeeperClient.handle(Seq(createRequest, existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc))
+    assertEquals("Response code for exists should be OK", Code.OK, Code.get(responses.head.rc))
     val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
     assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc))
     assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
@@ -271,17 +273,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation = {}
-      override def handleDeletion = {}
-      override def handleDataChange = {
+      override def handleDataChange(): Unit = {
         znodeChangeHandlerCountDownLatch.countDown()
       }
       override val path: String = mockPath
     }
 
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath, null)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT, null)
+    val responses = zookeeperClient.handle(Seq(createRequest, existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc))
+    assertEquals("Response code for exists should be OK", Code.OK, Code.get(responses.head.rc))
     val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte],
-1, null)).asInstanceOf[SetDataResponse]
     assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc))
     assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
@@ -293,7 +296,7 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
     val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
-      override def handleChildChange = {
+      override def handleChildChange(): Unit = {
         zNodeChildChangeHandlerCountDownLatch.countDown()
       }
       override val path: String = mockPath
@@ -304,6 +307,8 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
     assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
     zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc))
     val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
     assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc))
     assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
@@ -314,12 +319,9 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
     val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
     val stateChangeHandler = new StateChangeHandler {
-      override def beforeInitializingSession = {}
-      override def afterInitializingSession = {}
-      override def onAuthFailure = {
+      override def onAuthFailure(): Unit = {
         stateChangeHandlerCountDownLatch.countDown()
       }
-      override def onConnectionTimeout = {}
     }
     new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
     assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
deleted file mode 100644
index 4b90767..0000000
--- a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
-import kafka.controller.{ControlledShutdownLeaderSelector, ControllerContext}
-import org.easymock.EasyMock
-import org.junit.{Assert, Test}
-import Assert._
-import kafka.cluster.Broker
-import kafka.utils.ZkUtils
-
-import scala.collection.mutable
-
-class ControlledShutdownLeaderSelectorTest {
-
-  @Test
-  def testSelectLeader() {
-    val topicPartition = TopicAndPartition("topic", 1)
-    val assignment = Seq(6, 5, 4, 3, 2, 1)
-    val preferredReplicaId = assignment.head
-
-    val firstIsr = List(1, 3, 6)
-    val firstLeader = 1
-
-    val zkUtils = EasyMock.mock(classOf[ZkUtils])
-    val controllerContext = new ControllerContext(zkUtils)
-    controllerContext.liveBrokers = assignment.map(Broker(_, Seq.empty, None)).toSet
-    controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
-    controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)
-
-    val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-    val firstLeaderAndIsr = LeaderAndIsr(firstLeader, firstIsr)
-    val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition,
firstLeaderAndIsr)
-
-    assertEquals(preferredReplicaId, secondLeaderAndIsr.leader)
-    assertEquals(Seq(1, 6), secondLeaderAndIsr.isr)
-    assertEquals(1, secondLeaderAndIsr.zkVersion)
-    assertEquals(1, secondLeaderAndIsr.leaderEpoch)
-    assertEquals(assignment, secondReplicas)
-
-    controllerContext.shuttingDownBrokerIds += preferredReplicaId
-
-    val deadBrokerId = 2
-    controllerContext.liveBrokers = controllerContext.liveOrShuttingDownBrokers.filter(_.id
!= deadBrokerId)
-    controllerContext.shuttingDownBrokerIds -= deadBrokerId
-
-    val (thirdLeaderAndIsr, thirdReplicas) = leaderSelector.selectLeader(topicPartition,
secondLeaderAndIsr)
-
-    assertEquals(1, thirdLeaderAndIsr.leader)
-    assertEquals(Seq(1), thirdLeaderAndIsr.isr)
-    assertEquals(2, thirdLeaderAndIsr.zkVersion)
-    assertEquals(2, thirdLeaderAndIsr.leaderEpoch)
-    assertEquals(Seq(6, 5, 4, 3, 1), thirdReplicas)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 1dd0808..56f5f6d 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -136,7 +136,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       securityProtocol))
     val nodes = brokers.map(_.getNode(listenerName))
 
-    val controllerContext = new ControllerContext(zkUtils)
+    val controllerContext = new ControllerContext
     controllerContext.liveBrokers = brokers.toSet
     val metrics = new Metrics
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig,
Time.SYSTEM,


Mime
View raw message