kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-6193; Only delete reassign_partitions znode after reassignment is complete
Date Wed, 06 Dec 2017 20:26:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b69093398 -> d543e19a0


KAFKA-6193; Only delete reassign_partitions znode after reassignment is complete

- Ensure that `partitionsBeingReassigned` is fully populated before
`removePartitionFromReassignedPartitions` is invoked. This is
necessary to avoid premature deletion of the `reassign_partitions`
znode.
- Modify and add tests to verify the fixes.
- Add documentation.
- Use `info` log message if assignedReplicas == newReplicas and
remove control flow based on exceptions.
- General logging improvements.
- Simplify `initializePartitionAssignment` by relying on logic already
present in `maybeTriggerPartitionReassignment`.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #4283 from ijuma/kafka-6193-flaky-shouldPerformMultipleReassignmentOperationsOverVariousTopics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d543e19a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d543e19a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d543e19a

Branch: refs/heads/trunk
Commit: d543e19a09ed34ccb6fa435d430c2c715c3c7098
Parents: b690933
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Dec 6 22:18:41 2017 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Dec 6 22:18:41 2017 +0200

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |   4 +-
 .../controller/ControllerChannelManager.scala   |   7 +-
 .../controller/ControllerEventManager.scala     |   4 +-
 .../kafka/controller/KafkaController.scala      | 133 ++++++++++---------
 core/src/main/scala/kafka/log/LogManager.scala  |   9 +-
 .../admin/ReassignPartitionsClusterTest.scala   |  57 ++++++--
 .../controller/ControllerEventManagerTest.scala |   2 +-
 .../unit/kafka/network/SocketServerTest.scala   |   8 +-
 8 files changed, 135 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0654a91..e077aa4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -618,8 +618,8 @@ public class NetworkClient implements KafkaClient {
                 break; // Disconnections in other states are logged at debug level in Selector
         }
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
-            log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected",
request.request,
-                    request.header.correlationId(), nodeId);
+            log.trace("Cancelled request {} {} with correlation id {} due to node {} being
disconnected",
+                    request.header.apiKey(), request.request, request.header.correlationId(),
nodeId);
             if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA)
                 metadataUpdater.handleDisconnection(request.destination);
             else

http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index a072978..1b25f5f 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -207,6 +207,8 @@ class RequestSendThread(val controllerId: Int,
                         name: String)
   extends ShutdownableThread(name = name) {
 
+  logIdent = s"[RequestSendThread controllerId=$controllerId] "
+
   private val socketTimeoutMs = config.controllerSocketTimeoutMs
 
   override def doWork(): Unit = {
@@ -248,8 +250,9 @@ class RequestSendThread(val controllerId: Int,
 
         val response = clientResponse.responseBody
 
-        stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace("Received response
" +
-          s"${response.toString(requestHeader.apiVersion)} for a request sent to broker $brokerNode")
+        stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace(s"Received response
" +
+          s"${response.toString(requestHeader.apiVersion)} for request $api with correlation
id " +
+          s"${requestHeader.correlationId} sent to broker $brokerNode")
 
         if (callback != null) {
           callback(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index 8ccbfb5..b880f07 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -29,7 +29,7 @@ import scala.collection._
 object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
-class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
+class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState,
KafkaTimer],
                              eventProcessedListener: ControllerEvent => Unit) {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
@@ -56,6 +56,8 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
   }
 
   class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible
= false) {
+    logIdent = s"[ControllerEventThread controllerId=$controllerId] "
+
     override def doWork(): Unit = {
       queue.take() match {
         case KafkaController.ShutdownEventThread => initiateShutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 42c66f6..b626ade 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -65,8 +65,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
   private[controller] val kafkaScheduler = new KafkaScheduler(1)
 
   // visible for testing
-  private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
-    _ => updateMetrics())
+  private[controller] val eventManager = new ControllerEventManager(config.brokerId,
+    controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics())
 
   val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
@@ -234,7 +234,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     partitionStateMachine.startup()
 
     info(s"Ready to serve as the new controller with epoch $epoch")
-    maybeTriggerPartitionReassignment()
+    maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
     topicDeletionManager.tryTopicDeletion()
     val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
     onPreferredReplicaElection(pendingPreferredReplicaElections)
@@ -489,8 +489,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
       updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
       //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
       removePartitionFromReassignedPartitions(topicPartition)
-      info(s"Removed partition $topicPartition from the list of reassigned partitions in
zookeeper")
-      controllerContext.partitionsBeingReassigned.remove(topicPartition)
       //12. After electing leader, the replicas and isr information changes, so resend the
update metadata request to every broker
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
       // signal delete topic thread if reassignment for some partitions belonging to topics
being deleted just completed
@@ -498,33 +496,55 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     }
   }
 
-  private def initiateReassignReplicasForTopicPartition(topicPartition: TopicPartition,
-                                                        reassignedPartitionContext: ReassignedPartitionsContext)
{
-    val newReplicas = reassignedPartitionContext.newReplicas
-    val topic = topicPartition.topic
-    try {
-      val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicPartition)
-      assignedReplicasOpt match {
-        case Some(assignedReplicas) =>
-          if (assignedReplicas == newReplicas) {
-            throw new KafkaException(s"Partition $topicPartition to be reassigned is already
assigned to replicas " +
-              s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment")
-          } else {
-            info(s"Handling reassignment of partition $topicPartition to new replicas ${newReplicas.mkString(",")}")
-            // first register ISR change listener
-            reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
-            controllerContext.partitionsBeingReassigned.put(topicPartition, reassignedPartitionContext)
-            // mark topic ineligible for deletion for the partitions being reassigned
-            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
-            onPartitionReassignment(topicPartition, reassignedPartitionContext)
-          }
-        case None => throw new KafkaException(s"Attempt to reassign partition $topicPartition
that doesn't exist")
+  /**
+   * Trigger partition reassignment for the provided partitions if the assigned replicas
are not the same as the
+   * reassigned replicas (as defined in `ControllerContext.partitionsBeingReassigned`) and
if the topic has not been
+   * deleted.
+   *
+   * `partitionsBeingReassigned` must be populated with all partitions being reassigned before
this method is invoked
+   * as explained in the method documentation of `removePartitionFromReassignedPartitions`
(which is invoked by this
+   * method).
+   *
+   * @throws IllegalStateException if a partition is not in `partitionsBeingReassigned`
+   */
+  private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {
+    topicPartitions.foreach { tp =>
+      if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
+        error(s"Skipping reassignment of $tp since the topic is currently being deleted")
+        removePartitionFromReassignedPartitions(tp)
+      } else {
+        val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse
{
+          throw new IllegalStateException(s"Initiating reassign replicas for partition $tp
not present in " +
+            s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(",
")}")
+        }
+        val newReplicas = reassignedPartitionContext.newReplicas
+        val topic = tp.topic
+        controllerContext.partitionReplicaAssignment.get(tp) match {
+          case Some(assignedReplicas) =>
+            if (assignedReplicas == newReplicas) {
+              info(s"Partition $tp to be reassigned is already assigned to replicas " +
+                s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
+              removePartitionFromReassignedPartitions(tp)
+            } else {
+              try {
+                info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
+                // first register ISR change listener
+                reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
+                // mark topic ineligible for deletion for the partitions being reassigned
+                topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
+                onPartitionReassignment(tp, reassignedPartitionContext)
+              } catch {
+                case e: Throwable =>
+                  error(s"Error completing reassignment of partition $tp", e)
+                  // remove the partition from the admin path to unblock the admin client
+                  removePartitionFromReassignedPartitions(tp)
+              }
+            }
+          case None =>
+            error(s"Ignoring request to reassign partition $tp that doesn't exist.")
+            removePartitionFromReassignedPartitions(tp)
+        }
       }
-    } catch {
-      case e: Throwable =>
-        error(s"Error completing reassignment of partition $topicPartition", e)
-        // remove the partition from the admin path to unblock the admin client
-        removePartitionFromReassignedPartitions(topicPartition)
     }
   }
 
@@ -624,22 +644,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
     val partitionsBeingReassigned = zkClient.getPartitionReassignment
-    // check if they are already completed or topic was deleted
-    val reassignedPartitions = partitionsBeingReassigned.filter { case (tp, reassignmentReplicas)
=>
-      controllerContext.partitionReplicaAssignment.get(tp) match {
-        case None => true // topic deleted
-        case Some(currentReplicas) => currentReplicas == reassignmentReplicas // reassignment
completed
-      }
-    }.keys
-    reassignedPartitions.foreach(removePartitionFromReassignedPartitions)
-    val partitionsToReassign = partitionsBeingReassigned -- reassignedPartitions
-    controllerContext.partitionsBeingReassigned ++= partitionsToReassign.map { case (tp,
newReplicas) =>
+    info(s"Partitions being reassigned: $partitionsBeingReassigned")
+
+    controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned.iterator.map
{ case (tp, newReplicas) =>
       val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager,
tp)
       tp -> new ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)
     }
-    info(s"Partitions being reassigned: $partitionsBeingReassigned")
-    info(s"Partitions already reassigned: $reassignedPartitions")
-    info(s"Resuming reassignment of partitions: $partitionsToReassign")
   }
 
   private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
@@ -654,12 +664,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     (topicsToBeDeleted, topicsIneligibleForDeletion)
   }
 
-  private def maybeTriggerPartitionReassignment() {
-    controllerContext.partitionsBeingReassigned.foreach { case (tp, reassignContext) =>
-      initiateReassignReplicasForTopicPartition(tp, reassignContext)
-    }
-  }
-
   private def startChannelManager() {
     controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext,
config, time, metrics,
       stateChangeLogger, threadNamePrefix)
@@ -795,6 +799,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     }
   }
 
+  /**
+   * Remove partition from partitions being reassigned in ZooKeeper and ControllerContext.
If the partition reassignment
+   * is complete (i.e. there is no other partition with a reassignment in progress), the
reassign_partitions znode
+   * is deleted.
+   *
+   * `ControllerContext.partitionsBeingReassigned` must be populated with all partitions
being reassigned before this
+   * method is invoked to avoid premature deletion of the `reassign_partitions` znode.
+   */
   private def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
     controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext
=>
       reassignContext.unregisterReassignIsrChangeHandler(zkClient)
@@ -802,6 +814,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
     val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned -
topicPartition
 
+    info(s"Removing partition $topicPartition from the list of reassigned partitions in zookeeper")
+
     // write the new list to zookeeper
     if (updatedPartitionsBeingReassigned.isEmpty) {
       info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}")
@@ -1281,17 +1295,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
       // the `path exists` check for free
       if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler))
{
         val partitionReassignment = zkClient.getPartitionReassignment
-        val partitionsToBeReassigned = partitionReassignment -- controllerContext.partitionsBeingReassigned.keys
-        partitionsToBeReassigned.foreach { case (tp, newReplicas) =>
-          if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
-            error(s"Skipping reassignment of $tp since the topic is currently being deleted")
-            removePartitionFromReassignedPartitions(tp)
-          } else {
-            val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this,
eventManager,
-              tp)
-            initiateReassignReplicasForTopicPartition(tp, ReassignedPartitionsContext(newReplicas,
reassignIsrChangeHandler))
-          }
+
+        // Populate `partitionsBeingReassigned` with all partitions being reassigned before
invoking
+        // `maybeTriggerPartitionReassignment` (see method documentation for the reason)
+        partitionReassignment.foreach { case (tp, newReplicas) =>
+          val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this,
eventManager,
+            tp)
+          controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas,
reassignIsrChangeHandler))
         }
+
+        maybeTriggerPartitionReassignment(partitionReassignment.keySet)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 9a61be3..a7d106f 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -672,18 +672,15 @@ class LogManager(logDirs: Seq[File],
           else
             currentLogs.put(topicPartition, log)
 
-          info("Created log for partition [%s,%d] in %s with properties {%s}."
-            .format(topicPartition.topic,
-              topicPartition.partition,
-              logDir,
-              config.originals.asScala.mkString(", ")))
+          info(s"Created log for partition $topicPartition in $logDir with properties " +
+            s"{${config.originals.asScala.mkString(", ")}}.")
           // Remove the preferred log dir since it has already been satisfied
           preferredLogDirs.remove(topicPartition)
 
           log
         } catch {
           case e: IOException =>
-            val msg = s"Error while creating log for $topicPartition in dir ${logDir}"
+            val msg = s"Error while creating log for $topicPartition in dir $logDir"
             logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
             throw new KafkaStorageException(msg, e)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 11f75e2..865ab87 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -27,13 +27,12 @@ import org.junit.{After, Before, Test}
 import kafka.admin.ReplicationQuotaUtils._
 import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
-import org.apache.kafka.common.TopicPartitionReplica
+import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.Seq
 import scala.util.Random
-
 import java.io.File
 
 class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
@@ -51,7 +50,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging
{
   }
 
   def startBrokers(brokerIds: Seq[Int]) {
-    servers = brokerIds.map(i => createBrokerConfig(i, zkConnect, logDirCount = 3))
+    servers = brokerIds.map(i => createBrokerConfig(i, zkConnect, enableControlledShutdown
= false, logDirCount = 3))
       .map(c => createServer(KafkaConfig.fromProps(c)))
   }
 
@@ -132,12 +131,12 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     val newAssignment = generateAssignment(zkUtils, brokers, json(topicName), true)._1
     // Find a partition in the new assignment on broker 102 and a random log directory on
broker 102,
     // which currently does not have any partition for this topic
-    val partition1 = newAssignment.find { case (replica, brokerIds) => brokerIds.contains(102)}.get._1.partition
+    val partition1 = newAssignment.find { case (_, brokerIds) => brokerIds.contains(102)
}.get._1.partition
     val replica1 = new TopicPartitionReplica(topicName, partition1, 102)
     val expectedLogDir1 = getRandomLogDirAssignment(102)
     // Find a partition in the new assignment on broker 100 and a random log directory on
broker 100,
     // which currently has partition for this topic
-    val partition2 = newAssignment.find { case (replica, brokerIds) => brokerIds.contains(100)}.get._1.partition
+    val partition2 = newAssignment.find { case (_, brokerIds) => brokerIds.contains(100)
}.get._1.partition
     val replica2 = new TopicPartitionReplica(topicName, partition2, 100)
     val expectedLogDir2 = getRandomLogDirAssignment(100)
     // Generate a replica assignment to reassign replicas on broker 100 and 102 respectively
to a random log directory on the same broker.
@@ -227,7 +226,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     assertEquals(Seq(100, 102), actual("topic2")(2))//changed
 
     // The replicas should be in the expected log directories
-    val replicaDirs = adminClient.describeReplicaLogDirs(List(replica1, replica2).asJavaCollection).all().get()
+    val replicaDirs = adminClient.describeReplicaLogDirs(List(replica1, replica2).asJava).all().get()
     assertEquals(proposedReplicaAssignment(replica1), replicaDirs.get(replica1).getCurrentReplicaLogDir)
     assertEquals(proposedReplicaAssignment(replica2), replicaDirs.get(replica2).getCurrentReplicaLogDir)
   }
@@ -434,8 +433,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
   def shouldPerformThrottledReassignmentOverVariousTopics() {
     val throttle = Throttle(1000L)
 
-    //Given four brokers
-    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf)))
+    startBrokers(Seq(0, 1, 2, 3))
 
     //With up several small topics
     createTopic(zkUtils, "orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)), servers)
@@ -478,7 +476,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
    */
   @Test
   def shouldPerformMultipleReassignmentOperationsOverVariousTopics() {
-    servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf)))
+    startBrokers(Seq(0, 1, 2, 3))
 
     createTopic(zkUtils, "orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)), servers)
     createTopic(zkUtils, "payments", Map(0 -> List(0, 1), 1 -> List(0, 1)), servers)
@@ -493,7 +491,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     )
 
     new ReassignPartitionsCommand(zkUtils, None, firstMove).reassignPartitions()
-    waitForReassignmentToComplete()
+    // Low pause to detect deletion of the reassign_partitions znode before the reassignment
is complete
+    waitForReassignmentToComplete(pause = 1L)
 
     // Check moved replicas did move
     assertEquals(Seq(0, 2, 3), zkUtils.getReplicasForPartition("orders", 0))
@@ -517,7 +516,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     )
 
     new ReassignPartitionsCommand(zkUtils, None, secondMove).reassignPartitions()
-    waitForReassignmentToComplete()
+    // Low pause to detect deletion of the reassign_partitions znode before the reassignment
is complete
+    waitForReassignmentToComplete(pause = 1L)
 
     // Check moved replicas did move
     assertEquals(Seq(0, 2, 3), zkUtils.getReplicasForPartition("orders", 0))
@@ -550,7 +550,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
       }
     }.exists(identity)
 
-    waitForReassignmentToComplete()
+    // Low pause to detect deletion of the reassign_partitions znode before the reassignment
is complete
+    waitForReassignmentToComplete(pause = 1L)
 
     // Check moved replicas for thirdMove and fourthMove
     assertEquals(Seq(1, 2, 3), zkUtils.getReplicasForPartition("orders", 0))
@@ -566,8 +567,36 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     assertEquals(Seq(3), zkUtils.getReplicasForPartition("customers", 3))
   }
 
-  def waitForReassignmentToComplete() {
-    waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode ${ZkUtils.ReassignPartitionsPath}
wasn't deleted")
+  /**
+   * Set the `reassign_partitions` znode while the brokers are down and verify that the reassignment
is triggered by
+   * the Controller during start-up.
+   */
+  @Test
+  def shouldTriggerReassignmentOnControllerStartup(): Unit = {
+    startBrokers(Seq(0, 1, 2))
+    createTopic(zkUtils, "orders", Map(0 -> List(0, 1), 1 -> List(1, 2)), servers)
+    servers.foreach(_.shutdown())
+
+    val firstMove = Map(
+      new TopicPartition("orders", 0) -> Seq(2, 1), // moves
+      new TopicPartition("orders", 1) -> Seq(1, 2), // stays
+      new TopicPartition("customers", 0) -> Seq(1, 2) // non-existent topic, triggers
topic deleted path
+    )
+
+    // Set znode directly to avoid non-existent topic validation
+    zkClient.setOrCreatePartitionReassignment(firstMove)
+
+    servers.foreach(_.startup())
+    waitForReassignmentToComplete()
+
+    assertEquals(Seq(2, 1), zkUtils.getReplicasForPartition("orders", 0))
+    assertEquals(Seq(1, 2), zkUtils.getReplicasForPartition("orders", 1))
+    assertEquals(Seq.empty, zkUtils.getReplicasForPartition("customers", 0))
+  }
+
+  def waitForReassignmentToComplete(pause: Long = 100L) {
+    waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath),
+      s"Znode ${ZkUtils.ReassignPartitionsPath} wasn't deleted", pause = pause)
   }
 
   def json(topic: String*): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index ec9343e..e5753e5 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -53,7 +53,7 @@ class ControllerEventManagerTest {
   private def check(metricName: String, controllerState: ControllerState, process: () =>
Unit): Unit = {
     val controllerStats = new ControllerStats
     val eventProcessedListenerCount = new AtomicInteger
-    controllerEventManager = new ControllerEventManager(controllerStats.rateAndTimeMetrics,
+    controllerEventManager = new ControllerEventManager(0, controllerStats.rateAndTimeMetrics,
       _ => eventProcessedListenerCount.incrementAndGet)
     controllerEventManager.start()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d543e19a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f1f302a..9bc7437 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -72,13 +72,15 @@ class SocketServerTest extends JUnitSuite {
   val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider)
   server.startup()
   val sockets = new ArrayBuffer[Socket]
+
+  private val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafka")
   private var logLevelToRestore: Level = _
 
   @Before
   def setUp(): Unit = {
     // Run the tests with TRACE logging to exercise request logging path
-    logLevelToRestore = org.apache.log4j.LogManager.getRootLogger.getLevel
-    org.apache.log4j.LogManager.getLogger("kafka").setLevel(Level.TRACE)
+    logLevelToRestore = kafkaLogger.getLevel
+    kafkaLogger.setLevel(Level.TRACE)
   }
 
   @After
@@ -86,7 +88,7 @@ class SocketServerTest extends JUnitSuite {
     shutdownServerAndMetrics(server)
     sockets.foreach(_.close())
     sockets.clear()
-    org.apache.log4j.LogManager.getLogger("kafka").setLevel(logLevelToRestore)
+    kafkaLogger.setLevel(logLevelToRestore)
   }
 
   def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush:
Boolean = true) {


Mime
View raw message