kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8286; Generalized Leader Election Admin RPC (KIP-460) (#6686)
Date Wed, 29 May 2019 17:45:10 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 121308c  KAFKA-8286; Generalized Leader Election Admin RPC (KIP-460) (#6686)
121308c is described below

commit 121308cc7a2639a70fc8a60c99d2eaee52931951
Author: José Armando García Sancio <jsancio@users.noreply.github.com>
AuthorDate: Wed May 29 10:44:52 2019 -0700

    KAFKA-8286; Generalized Leader Election Admin RPC (KIP-460) (#6686)
    
    Implements KIP-460: https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC.
    
    Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 bin/kafka-leader-election.sh                       |  17 ++
 bin/windows/kafka-leader-election.bat              |  17 ++
 .../apache/kafka/clients/admin/AdminClient.java    |  95 +++++---
 ...eadersOptions.java => ElectLeadersOptions.java} |   6 +-
 .../kafka/clients/admin/ElectLeadersResult.java    |  76 +++++++
 .../admin/ElectPreferredLeadersOptions.java        |   4 +-
 .../clients/admin/ElectPreferredLeadersResult.java | 118 ++++------
 .../kafka/clients/admin/KafkaAdminClient.java      |  46 ++--
 .../ElectionType.java}                             |  29 ++-
 .../errors/ElectionNotNeededException.java}        |  19 +-
 .../EligibleLeadersNotAvailableException.java}     |  19 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   8 +-
 .../org/apache/kafka/common/protocol/Errors.java   |   7 +-
 .../kafka/common/requests/AbstractRequest.java     |   4 +-
 .../kafka/common/requests/AbstractResponse.java    |   4 +-
 .../org/apache/kafka/common/requests/ApiError.java |   4 +
 .../kafka/common/requests/ElectLeadersRequest.java | 134 +++++++++++
 .../common/requests/ElectLeadersResponse.java      | 129 +++++++++++
 .../requests/ElectPreferredLeadersRequest.java     | 129 -----------
 .../requests/ElectPreferredLeadersResponse.java    |  83 -------
 ...eadersRequest.json => ElectLeadersRequest.json} |  13 +-
 ...dersResponse.json => ElectLeadersResponse.json} |   6 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 111 ++++-----
 .../kafka/clients/admin/MockAdminClient.java       |  11 +
 .../kafka/common/requests/RequestResponseTest.java |  82 +++----
 core/src/main/scala/kafka/Kafka.scala              |   2 +-
 .../scala/kafka/admin/LeaderElectionCommand.scala  | 240 ++++++++++++++++++++
 .../PreferredReplicaLeaderElectionCommand.scala    | 121 +++++-----
 core/src/main/scala/kafka/admin/TopicCommand.scala |   2 +-
 core/src/main/scala/kafka/api/package.scala        |  47 ++++
 core/src/main/scala/kafka/cluster/Partition.scala  |   8 +-
 .../src/main/scala/kafka/controller/Election.scala |  15 +-
 .../scala/kafka/controller/KafkaController.scala   | 247 ++++++++++++--------
 .../kafka/controller/PartitionStateMachine.scala   | 249 ++++++++++++++-------
 .../kafka/controller/ReplicaStateMachine.scala     | 165 ++++++++------
 core/src/main/scala/kafka/log/LogManager.scala     |   5 +-
 ...ferredLeader.scala => DelayedElectLeader.scala} |  17 +-
 .../scala/kafka/server/DelayedOperationKey.scala   |   7 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 150 ++++++++-----
 .../main/scala/kafka/server/ReplicaManager.scala   |  75 ++++---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  75 ++++---
 .../kafka/api/AdminClientIntegrationTest.scala     |  98 ++++----
 .../kafka/api/AuthorizerIntegrationTest.scala      |  31 ++-
 .../PreferredReplicaElectionCommandTest.scala      |  70 ------
 ...PreferredReplicaLeaderElectionCommandTest.scala |  54 ++++-
 .../controller/MockPartitionStateMachine.scala     |  53 +++--
 .../controller/PartitionStateMachineTest.scala     | 131 +++++++++--
 .../kafka/controller/ReplicaStateMachineTest.scala |   2 +-
 .../AbstractCoordinatorConcurrencyTest.scala       |   2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  12 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  38 ++--
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   2 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  20 +-
 docs/upgrade.html                                  |   6 +
 gradle/spotbugs-exclude.xml                        |   6 +-
 .../integration/utils/EmbeddedKafkaCluster.java    |   7 +-
 56 files changed, 2010 insertions(+), 1118 deletions(-)

diff --git a/bin/kafka-leader-election.sh b/bin/kafka-leader-election.sh
new file mode 100755
index 0000000..88baef3
--- /dev/null
+++ b/bin/kafka-leader-election.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.LeaderElectionCommand "$@"
diff --git a/bin/windows/kafka-leader-election.bat b/bin/windows/kafka-leader-election.bat
new file mode 100644
index 0000000..0432a99
--- /dev/null
+++ b/bin/windows/kafka-leader-election.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" kafka.admin.LeaderElectionCommand %*
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 8826f83..ad9409a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,6 +17,14 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -26,12 +34,6 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigResource;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
  *
@@ -839,35 +841,75 @@ public abstract class AdminClient implements AutoCloseable {
     }
 
     /**
-     * Elect the preferred broker of the given {@code partitions} as leader, or
-     * elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null.
+     * Elect the preferred replica as leader for topic partitions.
      *
-     * This is a convenience method for {@link #electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}
-     * with default options.
-     * See the overload for more details.
+     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+     * with preferred election type and default options.
+     *
+     * This operation is supported by brokers with version 2.2.0 or higher.
      *
      * @param partitions      The partitions for which the preferred leader should be elected.
      * @return                The ElectPreferredLeadersResult.
+     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set)}.
      */
+    @Deprecated
     public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) {
         return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions());
     }
 
     /**
-     * Elect the preferred broker of the given {@code partitions} as leader, or
-     * elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null.
+     * Elect the preferred replica as leader for topic partitions.
+     *
+     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+     * with preferred election type.
+     *
+     * This operation is supported by brokers with version 2.2.0 or higher.
+     *
+     * @param partitions      The partitions for which the preferred leader should be elected.
+     * @param options         The options to use when electing the preferred leaders.
+     * @return                The ElectPreferredLeadersResult.
+     * @deprecated            Since 2.4.0. Use {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}.
+     */
+    @Deprecated
+    public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
+                                                             ElectPreferredLeadersOptions options) {
+        final ElectLeadersOptions newOptions = new ElectLeadersOptions();
+        newOptions.timeoutMs(options.timeoutMs());
+        final Set<TopicPartition> topicPartitions = partitions == null ? null : new HashSet<>(partitions);
+
+        return new ElectPreferredLeadersResult(electLeaders(ElectionType.PREFERRED, topicPartitions, newOptions));
+    }
+
+    /**
+     * Elect a replica as leader for topic partitions.
+     *
+     * This is a convenience method for {@link #electLeaders(ElectionType, Set, ElectLeadersOptions)}
+     * with default options.
+     *
+     * @param electionType            The type of election to conduct.
+     * @param partitions              The topics and partitions for which to conduct elections.
+     * @return                        The ElectLeadersResult.
+     */
+    public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions) {
+        return electLeaders(electionType, partitions, new ElectLeadersOptions());
+    }
+
+    /**
+     * Elect a replica as leader for the given {@code partitions}, or for all partitions if the argumentl
+     * to {@code partitions} is null.
      *
      * This operation is not transactional so it may succeed for some partitions while fail for others.
      *
-     * It may take several seconds after this method returns
-     * success for all the brokers in the cluster to become aware that the partitions have new leaders.
-     * During this time, {@link AdminClient#describeTopics(Collection)}
-     * may not return information about the partitions' new leaders.
+     * It may take several seconds after this method returns success for all the brokers in the cluster
+     * to become aware that the partitions have new leaders. During this time,
+     * {@link AdminClient#describeTopics(Collection)} may not return information about the partitions'
+     * new leaders.
      *
-     * This operation is supported by brokers with version 2.2.0 or higher.
+     * This operation is supported by brokers with version 2.2.0 or later if preferred eleciton is use;
+     * otherwise the brokers most be 2.4.0 or higher.
      *
-     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
-     * the returned {@code ElectPreferredLeadersResult}:</p>
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the future obtained
+     * from the returned {@code ElectLeadersResult}:</p>
      * <ul>
      *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
      *   if the authenticated user didn't have alter access to the cluster.</li>
@@ -883,12 +925,15 @@ public abstract class AdminClient implements AutoCloseable {
      *   if the preferred leader was not alive or not in the ISR.</li>
      * </ul>
      *
-     * @param partitions      The partitions for which the preferred leader should be elected.
-     * @param options         The options to use when electing the preferred leaders.
-     * @return                The ElectPreferredLeadersResult.
+     * @param electionType            The type of election to conduct.
+     * @param partitions              The topics and partitions for which to conduct elections.
+     * @param options                 The options to use when electing the leaders.
+     * @return                        The ElectLeadersResult.
      */
-    public abstract ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions,
-                                                                      ElectPreferredLeadersOptions options);
+    public abstract ElectLeadersResult electLeaders(
+            ElectionType electionType,
+            Set<TopicPartition> partitions,
+            ElectLeadersOptions options);
 
     /**
      * Get the metrics kept by the adminClient
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersOptions.java
similarity index 81%
copy from clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
copy to clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersOptions.java
index 80b0097..e0a08de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersOptions.java
@@ -19,13 +19,11 @@ package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Collection;
-
 /**
- * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
+ * Options for {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  *
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class ElectPreferredLeadersOptions extends AbstractOptions<ElectPreferredLeadersOptions> {
+final public class ElectLeadersOptions extends AbstractOptions<ElectLeadersOptions> {
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
new file mode 100644
index 0000000..b4aceba
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectLeadersResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+/**
+ * The result of {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+final public class ElectLeadersResult {
+    private final KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture;
+
+    ElectLeadersResult(KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture) {
+        this.electionFuture = electionFuture;
+    }
+
+    /**
+     * <p>Get a future for the topic partitions for which a leader election was attempted.
+     * If the election succeeded then the value for a topic partition will be the empty Optional.
+     * Otherwise the election failed and the Optional will be set with the error.</p>
+     */
+    public KafkaFuture<Map<TopicPartition, Optional<Throwable>>> partitions() {
+        return electionFuture;
+    }
+
+    /**
+     * Return a future which succeeds if all the topic elections succeed.
+     */
+    public KafkaFuture<Void> all() {
+        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
+
+        partitions().whenComplete(
+                new KafkaFuture.BiConsumer<Map<TopicPartition, Optional<Throwable>>, Throwable>() {
+                    @Override
+                    public void accept(Map<TopicPartition, Optional<Throwable>> topicPartitions, Throwable throwable) {
+                        if (throwable != null) {
+                            result.completeExceptionally(throwable);
+                        } else {
+                            for (Optional<Throwable> exception : topicPartitions.values()) {
+                                if (exception.isPresent()) {
+                                    result.completeExceptionally(exception.get());
+                                    return;
+                                }
+                            }
+                            result.complete(null);
+                        }
+                    }
+                });
+
+        return result;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
index 80b0097..c59aeb3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
@@ -18,14 +18,16 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
-
 import java.util.Collection;
 
 /**
  * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
  *
  * The API of this class is evolving, see {@link AdminClient} for details.
+ *
+ * @deprecated Since 2.4.0. Use {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  */
 @InterfaceStability.Evolving
+@Deprecated
 public class ElectPreferredLeadersOptions extends AbstractOptions<ElectPreferredLeadersOptions> {
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
index 963c5f1..5a98d5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java
@@ -17,33 +17,31 @@
 
 package org.apache.kafka.clients.admin;
 
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ApiError;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * The result of {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}
  *
  * The API of this class is evolving, see {@link AdminClient} for details.
+ *
+ * @deprecated Since 2.4.0. Use {@link AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  */
 @InterfaceStability.Evolving
+@Deprecated
 public class ElectPreferredLeadersResult {
+    private final ElectLeadersResult electionResult;
 
-    private final KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture;
-    private final Set<TopicPartition> partitions;
-
-    ElectPreferredLeadersResult(KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture, Set<TopicPartition> partitions) {
-        this.electionFuture = electionFuture;
-        this.partitions = partitions;
+    ElectPreferredLeadersResult(ElectLeadersResult electionResult) {
+        this.electionResult = electionResult;
     }
 
     /**
@@ -53,30 +51,28 @@ public class ElectPreferredLeadersResult {
      */
     public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
         final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
-        electionFuture.whenComplete(new KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() {
-            @Override
-            public void accept(Map<TopicPartition, ApiError> topicPartitions, Throwable throwable) {
-                if (throwable != null) {
-                    result.completeExceptionally(throwable);
-                } else if (!topicPartitions.containsKey(partition)) {
-                    result.completeExceptionally(new UnknownTopicOrPartitionException(
-                            "Preferred leader election for partition \"" + partition +
-                                    "\" was not attempted"));
-                } else if (partitions == null && topicPartitions.isEmpty()) {
-                    // If partitions is null, we requested information about all partitions.  In
-                    // that case, if topicPartitions is empty, that indicates a
-                    // CLUSTER_AUTHORIZATION_FAILED error.
-                    result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
-                } else {
-                    ApiException exception = topicPartitions.get(partition).exception();
-                    if (exception == null) {
-                        result.complete(null);
-                    } else {
-                        result.completeExceptionally(exception);
+
+        electionResult.partitions().whenComplete(
+                new KafkaFuture.BiConsumer<Map<TopicPartition, Optional<Throwable>>, Throwable>() {
+                    @Override
+                    public void accept(Map<TopicPartition, Optional<Throwable>> topicPartitions, Throwable throwable) {
+                        if (throwable != null) {
+                            result.completeExceptionally(throwable);
+                        } else if (!topicPartitions.containsKey(partition)) {
+                            result.completeExceptionally(new UnknownTopicOrPartitionException(
+                                        "Preferred leader election for partition \"" + partition +
+                                        "\" was not attempted"));
+                        } else {
+                            Optional<Throwable> exception = topicPartitions.get(partition);
+                            if (exception.isPresent()) {
+                                result.completeExceptionally(exception.get());
+                            } else {
+                                result.complete(null);
+                            }
+                        }
                     }
-                }
-            }
-        });
+                });
+
         return result;
     }
 
@@ -90,49 +86,27 @@ public class ElectPreferredLeadersResult {
      * with a null {@code partitions} argument.</p>
      */
     public KafkaFuture<Set<TopicPartition>> partitions() {
-        if (partitions != null) {
-            return KafkaFutureImpl.completedFuture(this.partitions);
-        } else {
-            final KafkaFutureImpl<Set<TopicPartition>> result = new KafkaFutureImpl<>();
-            electionFuture.whenComplete(new KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() {
-                @Override
-                public void accept(Map<TopicPartition, ApiError> topicPartitions, Throwable throwable) {
-                    if (throwable != null) {
-                        result.completeExceptionally(throwable);
-                    } else if (topicPartitions.isEmpty()) {
-                        result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
-                    } else {
-                        for (ApiError apiError : topicPartitions.values()) {
-                            if (apiError.isFailure()) {
-                                result.completeExceptionally(apiError.exception());
-                            }
+        final KafkaFutureImpl<Set<TopicPartition>> result = new KafkaFutureImpl<>();
+
+        electionResult.partitions().whenComplete(
+                new KafkaFuture.BiConsumer<Map<TopicPartition, Optional<Throwable>>, Throwable>() {
+                    @Override
+                    public void accept(Map<TopicPartition, Optional<Throwable>> topicPartitions, Throwable throwable) {
+                        if (throwable != null) {
+                            result.completeExceptionally(throwable);
+                        } else {
+                            result.complete(topicPartitions.keySet());
                         }
-                        result.complete(topicPartitions.keySet());
                     }
-                }
-            });
-            return result;
-        }
+                });
+
+        return result;
     }
 
     /**
      * Return a future which succeeds if all the topic elections succeed.
      */
     public KafkaFuture<Void> all() {
-        final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
-        electionFuture.thenApply(new KafkaFuture.Function<Map<TopicPartition, ApiError>, Void>() {
-            @Override
-            public Void apply(Map<TopicPartition, ApiError> topicPartitions) {
-                for (ApiError apiError : topicPartitions.values()) {
-                    if (apiError.isFailure()) {
-                        result.completeExceptionally(apiError.exception());
-                        return null;
-                    }
-                }
-                result.complete(null);
-                return null;
-            }
-        });
-        return result;
+        return electionResult.all();
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 9b37f5a..e01ecf3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
@@ -89,22 +90,21 @@ import org.apache.kafka.common.requests.AlterConfigsResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
-import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
-import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
@@ -121,10 +121,11 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsRequest;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
-import org.apache.kafka.common.requests.ElectPreferredLeadersRequest;
-import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
+import org.apache.kafka.common.requests.ElectLeadersRequest;
+import org.apache.kafka.common.requests.ElectLeadersResponse;
 import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
@@ -3016,25 +3017,33 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public ElectPreferredLeadersResult electPreferredLeaders(final Collection<TopicPartition> partitions,
-                                                             ElectPreferredLeadersOptions options) {
-        final Set<TopicPartition> partitionSet = partitions != null ? new HashSet<>(partitions) : null;
-        final KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture = new KafkaFutureImpl<>();
+    public ElectLeadersResult electLeaders(
+            final ElectionType electionType,
+            final Set<TopicPartition> topicPartitions,
+            ElectLeadersOptions options) {
+        final KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
-        runnable.call(new Call("electPreferredLeaders", calcDeadlineMs(now, options.timeoutMs()),
+        runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.timeoutMs()),
                 new ControllerNodeProvider()) {
 
             @Override
             public AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new ElectPreferredLeadersRequest.Builder(
-                        ElectPreferredLeadersRequest.toRequestData(partitions, timeoutMs));
+                return new ElectLeadersRequest.Builder(electionType, topicPartitions, timeoutMs);
             }
 
             @Override
             public void handleResponse(AbstractResponse abstractResponse) {
-                ElectPreferredLeadersResponse response = (ElectPreferredLeadersResponse) abstractResponse;
-                electionFuture.complete(
-                        ElectPreferredLeadersRequest.fromResponseData(response.data()));
+                ElectLeadersResponse response = (ElectLeadersResponse) abstractResponse;
+                Map<TopicPartition, Optional<Throwable>> result = ElectLeadersResponse.electLeadersResult(response.data());
+
+                // For version == 0 then errorCode would be 0 which maps to Errors.NONE
+                Errors error = Errors.forCode(response.data().errorCode());
+                if (error != Errors.NONE) {
+                    electionFuture.completeExceptionally(error.exception());
+                    return;
+                }
+
+                electionFuture.complete(result);
             }
 
             @Override
@@ -3042,6 +3051,7 @@ public class KafkaAdminClient extends AdminClient {
                 electionFuture.completeExceptionally(throwable);
             }
         }, now);
-        return new ElectPreferredLeadersResult(electionFuture, partitionSet);
+
+        return new ElectLeadersResult(electionFuture);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/common/ElectionType.java
similarity index 51%
copy from clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
copy to clients/src/main/java/org/apache/kafka/common/ElectionType.java
index 80b0097..c5b5e50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
+++ b/clients/src/main/java/org/apache/kafka/common/ElectionType.java
@@ -15,17 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.clients.admin;
+package org.apache.kafka.common;
 
+import java.util.Arrays;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Collection;
-
 /**
- * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
+ * Options for {@link org.apache.kafka.clients.admin.AdminClient#electLeaders(ElectionType, Set, ElectLeadersOptions)}.
  *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * The API of this class is evolving, see {@link org.apache.kafka.clients.admin.AdminClient} for details.
  */
 @InterfaceStability.Evolving
-public class ElectPreferredLeadersOptions extends AbstractOptions<ElectPreferredLeadersOptions> {
+public enum ElectionType {
+    PREFERRED((byte) 0), UNCLEAN((byte) 1);
+
+    public final byte value;
+
+    ElectionType(byte value) {
+        this.value = value;
+    }
+
+    public static ElectionType valueOf(byte value) {
+        if (value == PREFERRED.value) {
+            return PREFERRED;
+        } else if (value == UNCLEAN.value) {
+            return UNCLEAN;
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Value %s must be one of %s", value, Arrays.asList(ElectionType.values())));
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/common/errors/ElectionNotNeededException.java
similarity index 64%
copy from clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
copy to clients/src/main/java/org/apache/kafka/common/errors/ElectionNotNeededException.java
index 80b0097..74fc7d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ElectionNotNeededException.java
@@ -14,18 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.errors;
 
-package org.apache.kafka.clients.admin;
+public class ElectionNotNeededException extends InvalidMetadataException {
 
-import org.apache.kafka.common.annotation.InterfaceStability;
+    public ElectionNotNeededException(String message) {
+        super(message);
+    }
 
-import java.util.Collection;
-
-/**
- * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
- */
-@InterfaceStability.Evolving
-public class ElectPreferredLeadersOptions extends AbstractOptions<ElectPreferredLeadersOptions> {
+    public ElectionNotNeededException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/common/errors/EligibleLeadersNotAvailableException.java
similarity index 64%
copy from clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
copy to clients/src/main/java/org/apache/kafka/common/errors/EligibleLeadersNotAvailableException.java
index 80b0097..8767965 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/EligibleLeadersNotAvailableException.java
@@ -14,18 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.errors;
 
-package org.apache.kafka.clients.admin;
+public class EligibleLeadersNotAvailableException extends InvalidMetadataException {
 
-import org.apache.kafka.common.annotation.InterfaceStability;
+    public EligibleLeadersNotAvailableException(String message) {
+        super(message);
+    }
 
-import java.util.Collection;
-
-/**
- * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
- */
-@InterfaceStability.Evolving
-public class ElectPreferredLeadersOptions extends AbstractOptions<ElectPreferredLeadersOptions> {
+    public EligibleLeadersNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6a16578..849f268 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -24,8 +24,8 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
-import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -190,8 +190,8 @@ public enum ApiKeys {
     EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
     DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
     DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()),
-    ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS,
-            ElectPreferredLeadersResponseData.SCHEMAS),
+    ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
+            ElectLeadersResponseData.SCHEMAS),
     INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
                               IncrementalAlterConfigsResponseData.SCHEMAS);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 1d056e3..7e39f69 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -62,6 +62,8 @@ import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.LogDirNotFoundException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.MemberIdRequiredException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
@@ -306,7 +308,10 @@ public enum Errors {
     GROUP_MAX_SIZE_REACHED(81, "The consumer group has reached its max size.", GroupMaxSizeReachedException::new),
     FENCED_INSTANCE_ID(82, "The broker rejected this static consumer since " +
             "another consumer with the same group.instance.id has registered with a different member.id.",
-            FencedInstanceIdException::new);
+            FencedInstanceIdException::new),
+    ELIGIBLE_LEADERS_NOT_AVAILABLE(83, "Eligible topic partition leaders are not available",
+            EligibleLeadersNotAvailableException::new),
+    ELECTION_NOT_NEEDED(84, "Leader election not needed for topic partition", ElectionNotNeededException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index c199f8e..c8ff90d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -229,8 +229,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new DescribeDelegationTokenRequest(struct, apiVersion);
             case DELETE_GROUPS:
                 return new DeleteGroupsRequest(struct, apiVersion);
-            case ELECT_PREFERRED_LEADERS:
-                return new ElectPreferredLeadersRequest(struct, apiVersion);
+            case ELECT_LEADERS:
+                return new ElectLeadersRequest(struct, apiVersion);
             case INCREMENTAL_ALTER_CONFIGS:
                 return new IncrementalAlterConfigsRequest(struct, apiVersion);
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 32402e4..6d07431 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -156,8 +156,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DescribeDelegationTokenResponse(struct);
             case DELETE_GROUPS:
                 return new DeleteGroupsResponse(struct);
-            case ELECT_PREFERRED_LEADERS:
-                return new ElectPreferredLeadersResponse(struct, version);
+            case ELECT_LEADERS:
+                return new ElectLeadersResponse(struct, version);
             case INCREMENTAL_ALTER_CONFIGS:
                 return new IncrementalAlterConfigsResponse(struct, version);
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index dad21b3..6cb09f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -50,6 +50,10 @@ public class ApiError {
         message = struct.getOrElse(ERROR_MESSAGE, null);
     }
 
+    public ApiError(Errors error) {
+        this(error, error.message());
+    }
+
     public ApiError(Errors error, String message) {
         this.error = error;
         this.message = message;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
new file mode 100644
index 0000000..025733e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+public class ElectLeadersRequest extends AbstractRequest {
+    public static class Builder extends AbstractRequest.Builder<ElectLeadersRequest> {
+        private final ElectionType electionType;
+        private final Collection<TopicPartition> topicPartitions;
+        private final int timeoutMs;
+
+        public Builder(ElectionType electionType, Collection<TopicPartition> topicPartitions, int timeoutMs) {
+            super(ApiKeys.ELECT_LEADERS);
+            this.electionType = electionType;
+            this.topicPartitions = topicPartitions;
+            this.timeoutMs = timeoutMs;
+        }
+
+        @Override
+        public ElectLeadersRequest build(short version) {
+            return new ElectLeadersRequest(toRequestData(version), version);
+        }
+
+        @Override
+        public String toString() {
+            return "ElectLeadersRequest("
+                + "electionType=" + electionType
+                + ", topicPartitions=" + ((topicPartitions == null) ? "null" : MessageUtil.deepToString(topicPartitions.iterator()))
+                + ", timeoutMs=" + timeoutMs
+                + ")";
+        }
+
+        private ElectLeadersRequestData toRequestData(short version) {
+            if (electionType != ElectionType.PREFERRED && version == 0) {
+                throw new UnsupportedVersionException("API Version 0 only supports PREFERRED election type");
+            }
+
+            ElectLeadersRequestData data = new ElectLeadersRequestData()
+                .setTimeoutMs(timeoutMs);
+
+            if (topicPartitions != null) {
+                for (Map.Entry<String, List<Integer>> tp : CollectionUtils.groupPartitionsByTopic(topicPartitions).entrySet()) {
+                    data.topicPartitions().add(new ElectLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue()));
+                }
+            } else {
+                data.setTopicPartitions(null);
+            }
+
+            data.setElectionType(electionType.value);
+
+            return data;
+        }
+    }
+
+    private final ElectLeadersRequestData data;
+
+    private ElectLeadersRequest(ElectLeadersRequestData data, short version) {
+        super(ApiKeys.ELECT_LEADERS, version);
+        this.data = data;
+    }
+
+    public ElectLeadersRequest(Struct struct, short version) {
+        super(ApiKeys.ELECT_LEADERS, version);
+        this.data = new ElectLeadersRequestData(struct, version);
+    }
+
+    public ElectLeadersRequestData data() {
+        return data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ApiError apiError = ApiError.fromThrowable(e);
+        List<ReplicaElectionResult> electionResults = new ArrayList<>();
+
+        for (TopicPartitions topic : data.topicPartitions()) {
+            ReplicaElectionResult electionResult = new ReplicaElectionResult();
+
+            electionResult.setTopic(topic.topic());
+            for (Integer partitionId : topic.partitionId()) {
+                PartitionResult partitionResult = new PartitionResult();
+                partitionResult.setPartitionId(partitionId);
+                partitionResult.setErrorCode(apiError.error().code());
+                partitionResult.setErrorMessage(apiError.message());
+
+                electionResult.partitionResult().add(partitionResult);
+            }
+
+            electionResults.add(electionResult);
+        }
+
+        return new ElectLeadersResponse(throttleTimeMs, apiError.error().code(), electionResults, version());
+    }
+
+    public static ElectLeadersRequest parse(ByteBuffer buffer, short version) {
+        return new ElectLeadersRequest(ApiKeys.ELECT_LEADERS.parseRequest(version, buffer), version);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version());
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java
new file mode 100644
index 0000000..b085204
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersResponse.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class ElectLeadersResponse extends AbstractResponse {
+
+    private final short version;
+    private final ElectLeadersResponseData data;
+
+    public ElectLeadersResponse(Struct struct) {
+        this(struct, ApiKeys.ELECT_LEADERS.latestVersion());
+    }
+
+    public ElectLeadersResponse(Struct struct, short version) {
+        this.version = version;
+        this.data = new ElectLeadersResponseData(struct, version);
+    }
+
+    public ElectLeadersResponse(
+            int throttleTimeMs,
+            short errorCode,
+            List<ReplicaElectionResult> electionResults) {
+        this(throttleTimeMs, errorCode, electionResults, ApiKeys.ELECT_LEADERS.latestVersion());
+    }
+
+    public ElectLeadersResponse(
+            int throttleTimeMs,
+            short errorCode,
+            List<ReplicaElectionResult> electionResults,
+            short version) {
+
+        this.version = version;
+        this.data = new ElectLeadersResponseData();
+
+        data.setThrottleTimeMs(throttleTimeMs);
+
+        if (version >= 1) {
+            data.setErrorCode(errorCode);
+        }
+
+        data.setReplicaElectionResults(electionResults);
+    }
+
+    public ElectLeadersResponseData data() {
+        return data;
+    }
+
+    public short version() {
+        return version;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        HashMap<Errors, Integer> counts = new HashMap<>();
+        for (ReplicaElectionResult result : data.replicaElectionResults()) {
+            for (PartitionResult partitionResult : result.partitionResult()) {
+                Errors error = Errors.forCode(partitionResult.errorCode());
+                counts.put(error, counts.getOrDefault(error, 0) + 1);
+            }
+        }
+        return counts;
+    }
+
+    public static ElectLeadersResponse parse(ByteBuffer buffer, short version) {
+        return new ElectLeadersResponse(ApiKeys.ELECT_LEADERS.responseSchema(version).read(buffer), version);
+    }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return true;
+    }
+
+    public static Map<TopicPartition, Optional<Throwable>> electLeadersResult(ElectLeadersResponseData data) {
+        Map<TopicPartition, Optional<Throwable>> map = new HashMap<>();
+
+        for (ElectLeadersResponseData.ReplicaElectionResult topicResults : data.replicaElectionResults()) {
+            for (ElectLeadersResponseData.PartitionResult partitionResult : topicResults.partitionResult()) {
+                Optional<Throwable> value = Optional.empty();
+                Errors error = Errors.forCode(partitionResult.errorCode());
+                if (error != Errors.NONE) {
+                    value = Optional.of(error.exception(partitionResult.errorMessage()));
+                }
+
+                map.put(new TopicPartition(topicResults.topic(), partitionResult.partitionId()),
+                        value);
+            }
+        }
+
+        return map;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java
deleted file mode 100644
index ab96e3b..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
-import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ElectPreferredLeadersRequest extends AbstractRequest {
-    public static class Builder extends AbstractRequest.Builder<ElectPreferredLeadersRequest> {
-        private final ElectPreferredLeadersRequestData data;
-
-        public Builder(ElectPreferredLeadersRequestData data) {
-            super(ApiKeys.ELECT_PREFERRED_LEADERS);
-            this.data = data;
-        }
-
-        @Override
-        public ElectPreferredLeadersRequest build(short version) {
-            return new ElectPreferredLeadersRequest(data, version);
-        }
-
-        @Override
-        public String toString() {
-            return data.toString();
-        }
-    }
-
-    public static ElectPreferredLeadersRequestData toRequestData(Collection<TopicPartition> partitions, int timeoutMs) {
-        ElectPreferredLeadersRequestData d = new ElectPreferredLeadersRequestData()
-                .setTimeoutMs(timeoutMs);
-        if (partitions != null) {
-            for (Map.Entry<String, List<Integer>> tp : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) {
-                d.topicPartitions().add(new ElectPreferredLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue()));
-            }
-        } else {
-            d.setTopicPartitions(null);
-        }
-        return d;
-    }
-
-    public static Map<TopicPartition, ApiError> fromResponseData(ElectPreferredLeadersResponseData data) {
-        Map<TopicPartition, ApiError> map = new HashMap<>();
-        for (ElectPreferredLeadersResponseData.ReplicaElectionResult topicResults : data.replicaElectionResults()) {
-            for (ElectPreferredLeadersResponseData.PartitionResult partitionResult : topicResults.partitionResult()) {
-                map.put(new TopicPartition(topicResults.topic(), partitionResult.partitionId()),
-                        new ApiError(Errors.forCode(partitionResult.errorCode()),
-                                partitionResult.errorMessage()));
-            }
-        }
-        return map;
-    }
-
-    private final ElectPreferredLeadersRequestData data;
-    private final short version;
-
-    private ElectPreferredLeadersRequest(ElectPreferredLeadersRequestData data, short version) {
-        super(ApiKeys.ELECT_PREFERRED_LEADERS, version);
-        this.data = data;
-        this.version = version;
-    }
-
-    public ElectPreferredLeadersRequest(Struct struct, short version) {
-        super(ApiKeys.ELECT_PREFERRED_LEADERS, version);
-        this.data = new ElectPreferredLeadersRequestData(struct, version);
-        this.version = version;
-    }
-
-    public ElectPreferredLeadersRequestData data() {
-        return data;
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        ElectPreferredLeadersResponseData response = new ElectPreferredLeadersResponseData();
-        response.setThrottleTimeMs(throttleTimeMs);
-        ApiError apiError = ApiError.fromThrowable(e);
-        for (TopicPartitions topic : data.topicPartitions()) {
-            ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic.topic());
-            for (Integer partitionId : topic.partitionId()) {
-                electionResult.partitionResult().add(new ElectPreferredLeadersResponseData.PartitionResult()
-                        .setPartitionId(partitionId)
-                        .setErrorCode(apiError.error().code())
-                        .setErrorMessage(apiError.message()));
-            }
-            response.replicaElectionResults().add(electionResult);
-        }
-        return new ElectPreferredLeadersResponse(response);
-    }
-
-    public static ElectPreferredLeadersRequest parse(ByteBuffer buffer, short version) {
-        return new ElectPreferredLeadersRequest(ApiKeys.ELECT_PREFERRED_LEADERS.parseRequest(version, buffer), version);
-    }
-
-    /**
-     * Visible for testing.
-     */
-    @Override
-    public Struct toStruct() {
-        return data.toStruct(version);
-    }
-}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java
deleted file mode 100644
index c168c67..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ElectPreferredLeadersResponse extends AbstractResponse {
-
-    private final ElectPreferredLeadersResponseData data;
-
-    public ElectPreferredLeadersResponse(ElectPreferredLeadersResponseData data) {
-        this.data = data;
-    }
-
-    public ElectPreferredLeadersResponse(Struct struct, short version) {
-        this.data = new ElectPreferredLeadersResponseData(struct, version);
-    }
-
-    public ElectPreferredLeadersResponse(Struct struct) {
-        short latestVersion = (short) (ElectPreferredLeadersResponseData.SCHEMAS.length - 1);
-        this.data = new ElectPreferredLeadersResponseData(struct, latestVersion);
-    }
-
-    public ElectPreferredLeadersResponseData data() {
-        return data;
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        return data.toStruct(version);
-    }
-
-    @Override
-    public int throttleTimeMs() {
-        return data.throttleTimeMs();
-    }
-
-    @Override
-    public Map<Errors, Integer> errorCounts() {
-        HashMap<Errors, Integer> counts = new HashMap<>();
-        for (ReplicaElectionResult result : data.replicaElectionResults()) {
-            for (PartitionResult partitionResult : result.partitionResult()) {
-                Errors error = Errors.forCode(partitionResult.errorCode());
-                counts.put(error, counts.getOrDefault(error, 0) + 1);
-            }
-        }
-        return counts;
-    }
-
-    public static ElectPreferredLeadersResponse parse(ByteBuffer buffer, short version) {
-        return new ElectPreferredLeadersResponse(
-                ApiKeys.ELECT_PREFERRED_LEADERS.responseSchema(version).read(buffer), version);
-    }
-
-    @Override
-    public boolean shouldClientThrottle(short version) {
-        return version >= 3;
-    }
-}
\ No newline at end of file
diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json
similarity index 73%
rename from clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json
rename to clients/src/main/resources/common/message/ElectLeadersRequest.json
index da2a423..b7ed5dc 100644
--- a/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json
@@ -16,17 +16,20 @@
 {
   "apiKey": 43,
   "type": "request",
-  "name": "ElectPreferredLeadersRequest",
-  "validVersions": "0",
+  "name": "ElectLeadersRequest",
+  "validVersions": "0-1",
   "fields": [
+    { "name": "ElectionType", "type": "int8", "versions": "1+",
+      "about": "Type of elections to conduct for the partition. A value of '0' elects the preferred replica. A value of '1' elects the first live replica if there are no in-sync replica." },
     { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
-      "about": "The topic partitions to elect the preferred leader of.",
+      "about": "The topic partitions to elect leaders.",
       "fields": [
         { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
           "about": "The name of a topic." },
         { "name": "PartitionId", "type": "[]int32", "versions": "0+",
-          "about": "The partitions of this topic whose preferred leader should be elected" }
-      ]},
+          "about": "The partitions of this topic whose leader should be elected." }
+      ]
+    },
     { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
       "about": "The time in ms to wait for the election to complete." }
   ]
diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json b/clients/src/main/resources/common/message/ElectLeadersResponse.json
similarity index 91%
rename from clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json
rename to clients/src/main/resources/common/message/ElectLeadersResponse.json
index 637b2c1..09d0e15 100644
--- a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json
+++ b/clients/src/main/resources/common/message/ElectLeadersResponse.json
@@ -16,11 +16,13 @@
 {
   "apiKey": 43,
   "type": "response",
-  "name": "ElectPreferredLeadersResponse",
-  "validVersions": "0",
+  "name": "ElectLeadersResponse",
+  "validVersions": "0-1",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "1+", "ignorable": false,
+      "about": "The top level response error code." },
     { "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", "versions": "0+",
       "about": "The election results, or an empty array if the requester did not have permission and the request asks for all partitions.", "fields": [
       { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index e2341fb..aa8749c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
@@ -51,26 +52,25 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
-import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
-import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -78,7 +78,7 @@ import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
-import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
+import org.apache.kafka.common.requests.ElectLeadersResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.ListGroupsResponse;
@@ -123,6 +123,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 /**
@@ -675,51 +676,59 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testElectPreferredLeaders()  throws Exception {
+    public void testElectLeaders()  throws Exception {
         TopicPartition topic1 = new TopicPartition("topic", 0);
         TopicPartition topic2 = new TopicPartition("topic", 2);
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-
-            // Test a call where one partition has an error.
-            ApiError value = ApiError.fromThrowable(new ClusterAuthorizationException(null));
-            ElectPreferredLeadersResponseData responseData = new ElectPreferredLeadersResponseData();
-            ReplicaElectionResult r = new ReplicaElectionResult().setTopic(topic1.topic());
-            r.partitionResult().add(new PartitionResult()
-                    .setPartitionId(topic1.partition())
-                    .setErrorCode(ApiError.NONE.error().code())
-                    .setErrorMessage(ApiError.NONE.message()));
-            r.partitionResult().add(new PartitionResult()
-                    .setPartitionId(topic2.partition())
-                    .setErrorCode(value.error().code())
-                    .setErrorMessage(value.message()));
-            responseData.replicaElectionResults().add(r);
-            env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData));
-            ElectPreferredLeadersResult results = env.adminClient().electPreferredLeaders(asList(topic1, topic2));
-            results.partitionResult(topic1).get();
-            TestUtils.assertFutureError(results.partitionResult(topic2), ClusterAuthorizationException.class);
-            TestUtils.assertFutureError(results.all(), ClusterAuthorizationException.class);
-
-            // Test a call where there are no errors.
-            r.partitionResult().clear();
-            r.partitionResult().add(new PartitionResult()
-                    .setPartitionId(topic1.partition())
-                    .setErrorCode(ApiError.NONE.error().code())
-                    .setErrorMessage(ApiError.NONE.message()));
-            r.partitionResult().add(new PartitionResult()
-                    .setPartitionId(topic2.partition())
-                    .setErrorCode(ApiError.NONE.error().code())
-                    .setErrorMessage(ApiError.NONE.message()));
-            env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData));
-
-            results = env.adminClient().electPreferredLeaders(asList(topic1, topic2));
-            results.partitionResult(topic1).get();
-            results.partitionResult(topic2).get();
-
-            // Now try a timeout
-            results = env.adminClient().electPreferredLeaders(asList(topic1, topic2), new ElectPreferredLeadersOptions().timeoutMs(100));
-            TestUtils.assertFutureError(results.partitionResult(topic1), TimeoutException.class);
-            TestUtils.assertFutureError(results.partitionResult(topic2), TimeoutException.class);
+            for (ElectionType electionType : ElectionType.values()) {
+                env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+                // Test a call where one partition has an error.
+                ApiError value = ApiError.fromThrowable(new ClusterAuthorizationException(null));
+                List<ReplicaElectionResult> electionResults = new ArrayList<>();
+                ReplicaElectionResult electionResult = new ReplicaElectionResult();
+                electionResult.setTopic(topic1.topic());
+                // Add partition 1 result
+                PartitionResult partition1Result = new PartitionResult();
+                partition1Result.setPartitionId(topic1.partition());
+                partition1Result.setErrorCode(value.error().code());
+                partition1Result.setErrorMessage(value.message());
+                electionResult.partitionResult().add(partition1Result);
+
+                // Add partition 2 result
+                PartitionResult partition2Result = new PartitionResult();
+                partition2Result.setPartitionId(topic2.partition());
+                partition2Result.setErrorCode(value.error().code());
+                partition2Result.setErrorMessage(value.message());
+                electionResult.partitionResult().add(partition2Result);
+
+                electionResults.add(electionResult);
+
+                env.kafkaClient().prepareResponse(new ElectLeadersResponse(0, Errors.NONE.code(), electionResults));
+                ElectLeadersResult results = env.adminClient().electLeaders(
+                        electionType,
+                        new HashSet<>(asList(topic1, topic2)));
+                assertEquals(results.partitions().get().get(topic2).get().getClass(), ClusterAuthorizationException.class);
+
+                // Test a call where there are no errors. By mutating the internal of election results
+                partition1Result.setErrorCode(ApiError.NONE.error().code());
+                partition1Result.setErrorMessage(ApiError.NONE.message());
+
+                partition2Result.setErrorCode(ApiError.NONE.error().code());
+                partition2Result.setErrorMessage(ApiError.NONE.message());
+
+                env.kafkaClient().prepareResponse(new ElectLeadersResponse(0, Errors.NONE.code(), electionResults));
+                results = env.adminClient().electLeaders(electionType, new HashSet<>(asList(topic1, topic2)));
+                assertFalse(results.partitions().get().get(topic1).isPresent());
+                assertFalse(results.partitions().get().get(topic2).isPresent());
+
+                // Now try a timeout
+                results = env.adminClient().electLeaders(
+                        electionType,
+                        new HashSet<>(asList(topic1, topic2)),
+                        new ElectLeadersOptions().timeoutMs(100));
+                TestUtils.assertFutureError(results.partitions(), TimeoutException.class);
+            }
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 9709372..19f9eae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
@@ -333,11 +334,21 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Deprecated
+    @Override
     public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions, ElectPreferredLeadersOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
     @Override
+    public ElectLeadersResult electLeaders(
+            ElectionType electionType,
+            Set<TopicPartition> partitions,
+            ElectLeadersOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 32f3305..e06202b 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
@@ -33,28 +34,30 @@ import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
-import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
-import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
-import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
-import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
-import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
-import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -67,11 +70,6 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
-import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -346,10 +344,10 @@ public class RequestResponseTest {
         checkRequest(createRenewTokenRequest(), true);
         checkErrorResponse(createRenewTokenRequest(), new UnknownServerException(), true);
         checkResponse(createRenewTokenResponse(), 0, true);
-        checkRequest(createElectPreferredLeadersRequest(), true);
-        checkRequest(createElectPreferredLeadersRequestNullPartitions(), true);
-        checkErrorResponse(createElectPreferredLeadersRequest(), new UnknownServerException(), true);
-        checkResponse(createElectPreferredLeadersResponse(), 0, true);
+        checkRequest(createElectLeadersRequest(), true);
+        checkRequest(createElectLeadersRequestNullPartitions(), true);
+        checkErrorResponse(createElectLeadersRequest(), new UnknownServerException(), true);
+        checkResponse(createElectLeadersResponse(), 1, true);
         checkRequest(createIncrementalAlterConfigsRequest(), true);
         checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true);
         checkResponse(createIncrementalAlterConfigsResponse(), 0, true);
@@ -1515,32 +1513,36 @@ public class RequestResponseTest {
         return new DescribeDelegationTokenResponse(20, Errors.NONE, tokenList);
     }
 
-    private ElectPreferredLeadersRequest createElectPreferredLeadersRequestNullPartitions() {
-        return new ElectPreferredLeadersRequest.Builder(
-                new ElectPreferredLeadersRequestData()
-                        .setTimeoutMs(100)
-                        .setTopicPartitions(null))
-                .build((short) 0);
+    private ElectLeadersRequest createElectLeadersRequestNullPartitions() {
+        return new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 100).build((short) 1);
     }
 
-    private ElectPreferredLeadersRequest createElectPreferredLeadersRequest() {
-        ElectPreferredLeadersRequestData data = new ElectPreferredLeadersRequestData()
-                .setTimeoutMs(100);
-        data.topicPartitions().add(new TopicPartitions().setTopic("data").setPartitionId(asList(1, 2)));
-        return new ElectPreferredLeadersRequest.Builder(data).build((short) 0);
+    private ElectLeadersRequest createElectLeadersRequest() {
+        List<TopicPartition> partitions = asList(new TopicPartition("data", 1), new TopicPartition("data", 2));
+
+        return new ElectLeadersRequest.Builder(ElectionType.PREFERRED, partitions, 100).build((short) 1);
     }
 
-    private ElectPreferredLeadersResponse createElectPreferredLeadersResponse() {
-        ElectPreferredLeadersResponseData data = new ElectPreferredLeadersResponseData().setThrottleTimeMs(200);
-        ReplicaElectionResult resultsByTopic = new ReplicaElectionResult().setTopic("myTopic");
-        resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(0)
-                .setErrorCode(Errors.NONE.code())
-                .setErrorMessage(Errors.NONE.message()));
-        resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(1)
-                .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-                .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
-        data.replicaElectionResults().add(resultsByTopic);
-        return new ElectPreferredLeadersResponse(data);
+    private ElectLeadersResponse createElectLeadersResponse() {
+        String topic = "myTopic";
+        List<ReplicaElectionResult> electionResults = new ArrayList<>();
+        ReplicaElectionResult electionResult = new ReplicaElectionResult();
+        electionResult.setTopic(topic);
+        // Add partition 1 result
+        PartitionResult partitionResult = new PartitionResult();
+        partitionResult.setPartitionId(0);
+        partitionResult.setErrorCode(ApiError.NONE.error().code());
+        partitionResult.setErrorMessage(ApiError.NONE.message());
+        electionResult.partitionResult().add(partitionResult);
+
+        // Add partition 2 result
+        partitionResult = new PartitionResult();
+        partitionResult.setPartitionId(1);
+        partitionResult.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
+        partitionResult.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
+        electionResult.partitionResult().add(partitionResult);
+
+        return new ElectLeadersResponse(200, Errors.NONE.code(), electionResults);
     }
 
     private IncrementalAlterConfigsRequest createIncrementalAlterConfigsRequest() {
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 9fe0451..5950b71 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -38,7 +38,7 @@ object Kafka extends Logging {
     // fact that this class ignores the first parameter which is interpreted as positional and mandatory
     // but would not be mandatory if --version is specified
     // This is a bit of an ugly crutch till we get a chance to rework the entire command line parsing
-    val versionOpt = optionParser.accepts("version", "Print version information and exit.")
+    optionParser.accepts("version", "Print version information and exit.")
 
     if (args.length == 0 || args.contains("--help")) {
       CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
new file mode 100644
index 0000000..58a55e4
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+import joptsimple.util.EnumConverter
+import kafka.common.AdminCommandFailedException
+import kafka.utils.CommandDefaultOptions
+import kafka.utils.CommandLineUtils
+import kafka.utils.CoreUtils
+import kafka.utils.Json
+import kafka.utils.Logging
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.{AdminClient => JAdminClient}
+import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.errors.ElectionNotNeededException
+import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.utils.Utils
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.duration._
+
+object LeaderElectionCommand extends Logging {
+  def main(args: Array[String]): Unit = {
+    run(args, 30.second)
+  }
+
+  def run(args: Array[String], timeout: Duration): Unit = {
+    val commandOptions = new LeaderElectionCommandOptions(args)
+    CommandLineUtils.printHelpAndExitIfNeeded(
+      commandOptions,
+      "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
+    )
+
+    val electionType = commandOptions.options.valueOf(commandOptions.electionType)
+
+    val jsonFileTopicPartitions = Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { path  =>
+      parseReplicaElectionData(Utils.readFileAsString(path))
+    }
+
+    val singleTopicPartition = (
+      Option(commandOptions.options.valueOf(commandOptions.topic)),
+      Option(commandOptions.options.valueOf(commandOptions.partition))
+    ) match {
+      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, partition)))
+      case _ => None
+    }
+
+    /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use.
+     * Jopt-Simple should be validating that this option required if the --topic and --path-to-json-file
+     */
+    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)
+
+    val adminClient = {
+      val props = Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { config =>
+        Utils.loadProps(config)
+      }.getOrElse(new Properties())
+
+      props.setProperty(
+        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+        commandOptions.options.valueOf(commandOptions.bootstrapServer)
+      )
+      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
+
+      JAdminClient.create(props)
+    }
+
+    try {
+      electLeaders(adminClient, electionType, topicPartitions)
+    } finally {
+      adminClient.close()
+    }
+  }
+
+  private[this] def parseReplicaElectionData(jsonString: String): Set[TopicPartition] = {
+    Json.parseFull(jsonString) match {
+      case Some(js) =>
+        js.asJsonObject.get("partitions") match {
+          case Some(partitionsList) =>
+            val partitionsRaw = partitionsList.asJsonArray.iterator.map(_.asJsonObject)
+            val partitions = partitionsRaw.map { p =>
+              val topic = p("topic").to[String]
+              val partition = p("partition").to[Int]
+              new TopicPartition(topic, partition)
+            }.toBuffer
+            val duplicatePartitions = CoreUtils.duplicates(partitions)
+            if (duplicatePartitions.nonEmpty) {
+              throw new AdminOperationException(
+                s"Replica election data contains duplicate partitions: ${duplicatePartitions.mkString(",")}"
+              )
+            }
+            partitions.toSet
+          case None => throw new AdminOperationException("Replica election data is missing \"partition\" field")
+        }
+      case None => throw new AdminOperationException("Replica election data is empty")
+    }
+  }
+
+  private[this] def electLeaders(
+    client: JAdminClient,
+    electionType: ElectionType,
+    topicPartitions: Option[Set[TopicPartition]]
+  ): Unit = {
+    val electionResults = try {
+      val partitions = topicPartitions.map(_.asJava).orNull
+      debug(s"Calling AdminClient.electLeaders($electionType, $partitions)")
+      client.electLeaders(electionType, partitions).partitions.get.asScala
+    } catch {
+      case e: ExecutionException =>
+        e.getCause match {
+          case cause: TimeoutException =>
+            val message = "Timeout waiting for election results"
+            println(message)
+            throw new AdminCommandFailedException(message, cause)
+          case cause: ClusterAuthorizationException =>
+            val message = "Not authorized to perform leader election"
+            println(message)
+            throw new AdminCommandFailedException(message, cause)
+          case _ =>
+            throw e
+        }
+      case e: Throwable =>
+        println("Error while making request")
+        throw e
+    }
+
+    val succeeded = mutable.Set.empty[TopicPartition]
+    val noop = mutable.Set.empty[TopicPartition]
+    val failed = mutable.Map.empty[TopicPartition, Throwable]
+
+    electionResults.foreach { case (topicPartition, error) =>
+      val _: Unit = if (error.isPresent) {
+        error.get match {
+          case _: ElectionNotNeededException => noop += topicPartition
+          case _ => failed += topicPartition -> error.get
+        }
+      } else {
+        succeeded += topicPartition
+      }
+    }
+
+    if (succeeded.nonEmpty) {
+      val partitions = succeeded.mkString(", ")
+      println(s"Successfully completed leader election ($electionType) for partitions $partitions")
+    }
+
+    if (noop.nonEmpty) {
+      val partitions = succeeded.mkString(", ")
+      println(s"Valid replica already elected for partitions $partitions")
+    }
+
+    if (failed.nonEmpty) {
+      val rootException = new AdminCommandFailedException(s"${failed.size} replica(s) could not be elected")
+      failed.foreach { case (topicPartition, exception) =>
+        println(s"Error completing leader election ($electionType) for partition: $topicPartition: $exception")
+        rootException.addSuppressed(exception)
+      }
+      throw rootException
+    }
+  }
+}
+
+private final class LeaderElectionCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+  val bootstrapServer = parser
+    .accepts(
+      "bootstrap-server",
+      "A hostname and port for the broker to connect to, in the form host:port. Multiple comma separated URLs can be given. REQUIRED.")
+    .withRequiredArg
+    .required
+    .describedAs("host:port")
+    .ofType(classOf[String])
+  val adminClientConfig = parser
+    .accepts(
+      "admin.config",
+      "Configuration properties files to pass to the admin client")
+    .withRequiredArg
+    .describedAs("config file")
+    .ofType(classOf[String])
+
+  val pathToJsonFile = parser
+    .accepts(
+      "path-to-json-file",
+      "The JSON file with the list  of partition for which leader elections should be performed. This is an example format. \n{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\nNot allowed if --all-topic-partitions or --topic flags are specified.")
+    .withRequiredArg
+    .describedAs("Path to JSON file")
+    .ofType(classOf[String])
+
+  val topic = parser
+    .accepts(
+      "topic",
+      "Name of topic for which to perform an election. Not allowed if --path-to-json-file or --all-topic-partitions is specified.")
+    .availableUnless("path-to-json-file")
+    .withRequiredArg
+    .describedAs("topic name")
+    .ofType(classOf[String])
+  val partition = parser
+    .accepts(
+      "partition",
+      "Partition id for which to perform an election. REQUIRED if --topic is specified.")
+    .requiredIf("topic")
+    .withRequiredArg
+    .describedAs("partition id")
+    .ofType(classOf[Integer])
+
+  val allTopicPartitions = parser
+    .accepts(
+      "all-topic-partitions",
+      "Perform election on all of the eligible topic partitions based on the type of election (see the --election-type flag). Not allowed if --topic or --path-to-json-file is specified.")
+    .requiredUnless("path-to-json-file", "topic")
+
+  val electionType = parser
+    .accepts(
+      "election-type",
+      "Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.")
+    .withRequiredArg
+    .required
+    .describedAs("election type")
+    .withValuesConvertedBy(ElectionTypeConverter)
+
+  options = parser.parse(args: _*)
+}
+
+final object ElectionTypeConverter extends EnumConverter[ElectionType](classOf[ElectionType]) { }
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 13b65db..15242f7 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -16,32 +16,34 @@
  */
 package kafka.admin
 
+import collection.JavaConverters._
+import collection._
 import java.util.Properties
 import java.util.concurrent.ExecutionException
-
 import joptsimple.OptionSpecBuilder
 import kafka.common.AdminCommandFailedException
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.errors.ElectionNotNeededException
 import org.apache.kafka.common.errors.TimeoutException
-
-import collection.JavaConverters._
-import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.{KafkaFuture, TopicPartition}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.Utils
 import org.apache.zookeeper.KeeperException.NodeExistsException
 
-import collection._
-
 object PreferredReplicaLeaderElectionCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
-
     val timeout = 30000
     run(args, timeout)
   }
+
   def run(args: Array[String], timeout: Int = 30000): Unit = {
+    println("This tool is deprecated. Please use kafka-leader-election tool. Tracking issue: KAFKA-8405")
     val commandOpts = new PreferredReplicaLeaderElectionCommandOptions(args)
     CommandLineUtils.printHelpAndExitIfNeeded(commandOpts, "This tool helps to causes leadership for each partition to be transferred back to the 'preferred replica'," +
       " it can be used to balance leadership among the servers.")
@@ -209,71 +211,68 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
 
     val adminClient = org.apache.kafka.clients.admin.AdminClient.create(adminClientProps)
 
-    /**
-      * Wait until the given future has completed, then return whether it completed exceptionally.
-      * Because KafkaFuture.isCompletedExceptionally doesn't wait for a result
-      */
-    private def completedExceptionally[T](future: KafkaFuture[T]): Boolean = {
-      try {
-        future.get()
-        false
-      } catch {
-        case (_: Throwable) =>
-          true
-      }
-    }
-
     override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = {
       val partitions = partitionsFromUser match {
         case Some(partitionsFromUser) => partitionsFromUser.asJava
         case None => null
       }
-      debug(s"Calling AdminClient.electPreferredLeaders($partitions)")
-      val result = adminClient.electPreferredLeaders(partitions)
-      // wait for all results
-
-      val attemptedPartitions = partitionsFromUser match {
-        case Some(partitionsFromUser) => partitions.asScala
-        case None => try {
-          result.partitions().get.asScala
-        } catch {
-          case e: ExecutionException =>
-            val cause = e.getCause
-            if (cause.isInstanceOf[TimeoutException]) {
-              // We timed out, or don't even know the attempted partitions
-              println("Timeout waiting for election results")
-            }
-            throw new AdminCommandFailedException(null, cause)
-          case e: Throwable =>
-            // We don't even know the attempted partitions
-            println("Error while making request")
-            e.printStackTrace()
-            return
-        }
-      }
+      debug(s"Calling AdminClient.electLeaders(ElectionType.PREFERRED, $partitions)")
 
-      val (exceptional, ok) = attemptedPartitions.map(tp => tp -> result.partitionResult(tp)).
-        partition { case (_, partitionResult) => completedExceptionally(partitionResult) }
+      val electionResults = try {
+        adminClient.electLeaders(ElectionType.PREFERRED, partitions).partitions.get.asScala
+      } catch {
+        case e: ExecutionException =>
+          val cause = e.getCause
+          if (cause.isInstanceOf[TimeoutException]) {
+            println("Timeout waiting for election results")
+            throw new AdminCommandFailedException("Timeout waiting for election results", cause)
+          } else if (cause.isInstanceOf[ClusterAuthorizationException]) {
+            println(s"Not authorized to perform leader election")
+            throw new AdminCommandFailedException("Not authorized to perform leader election", cause)
+          }
 
-      if (!ok.isEmpty) {
-        println(s"Successfully completed preferred replica election for partitions ${ok.map{ case (tp, future) => tp }.mkString(", ")}")
+          throw e
+        case e: Throwable =>
+          // We don't even know the attempted partitions
+          println("Error while making request")
+          e.printStackTrace()
+          return
       }
-      if (!exceptional.isEmpty) {
-        val adminException = new AdminCommandFailedException(
-          s"${exceptional.size} preferred replica(s) could not be elected")
-        for ((partition, void) <- exceptional) {
-          val exception = try {
-            void.get()
-            new AdminCommandFailedException("Exceptional future with no exception")
-          } catch {
-            case e: ExecutionException => e.getCause
+
+      val succeeded = mutable.Set.empty[TopicPartition]
+      val noop = mutable.Set.empty[TopicPartition]
+      val failed = mutable.Map.empty[TopicPartition, Throwable]
+
+      electionResults.foreach { case (topicPartition, error) =>
+        val _: Unit = if (error.isPresent) {
+          if (error.get.isInstanceOf[ElectionNotNeededException]) {
+            noop += topicPartition
+          } else {
+            failed += topicPartition -> error.get
           }
-          println(s"Error completing preferred replica election for partition $partition: $exception")
-          adminException.addSuppressed(exception)
+        } else {
+          succeeded += topicPartition
         }
-        throw adminException
       }
 
+      if (!succeeded.isEmpty) {
+        val partitions = succeeded.mkString(", ")
+        println(s"Successfully completed preferred leader election for partitions $partitions")
+      }
+
+      if (!noop.isEmpty) {
+        val partitions = succeeded.mkString(", ")
+        println(s"Preferred replica already elected for partitions $partitions")
+      }
+
+      if (!failed.isEmpty) {
+        val rootException = new AdminCommandFailedException(s"${failed.size} preferred replica(s) could not be elected")
+        failed.foreach { case (topicPartition, exception) =>
+          println(s"Error completing preferred leader election for partition: $topicPartition: $exception")
+          rootException.addSuppressed(exception)
+        }
+        throw rootException
+      }
     }
 
     override def close(): Unit = {
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f28a267..996f73b 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -415,7 +415,7 @@ object TopicCommand extends Logging {
     }
 
     override def getTopics(topicWhitelist: Option[String], excludeInternalTopics: Boolean = false): Seq[String] = {
-      val allTopics = zkClient.getAllTopicsInCluster.sorted
+      val allTopics = zkClient.getAllTopicsInCluster.toSeq.sorted
       doGetTopics(allTopics, topicWhitelist, excludeInternalTopics)
     }
 
diff --git a/core/src/main/scala/kafka/api/package.scala b/core/src/main/scala/kafka/api/package.scala
new file mode 100644
index 0000000..aa2fdfd
--- /dev/null
+++ b/core/src/main/scala/kafka/api/package.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.ElectLeadersRequest
+import scala.collection.JavaConverters._
+import scala.collection.breakOut
+
+package object api {
+  implicit final class ElectLeadersRequestOps(val self: ElectLeadersRequest) extends AnyVal {
+    def topicPartitions: Set[TopicPartition] = {
+      if (self.data.topicPartitions == null) {
+        Set.empty
+      } else {
+        self.data.topicPartitions.asScala.flatMap { topicPartition =>
+          topicPartition.partitionId.asScala.map { partitionId =>
+            new TopicPartition(topicPartition.topic, partitionId)
+          }
+        }(breakOut)
+      }
+    }
+
+    def electionType: ElectionType = {
+      if (self.version == 0) {
+        ElectionType.PREFERRED
+      } else {
+        ElectionType.valueOf(self.data.electionType)
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 256f1a0..a6cce32 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -92,22 +92,22 @@ class DelayedOperations(topicPartition: TopicPartition,
                         deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords]) {
 
   def checkAndCompleteAll(): Unit = {
-    val requestKey = new TopicPartitionOperationKey(topicPartition)
+    val requestKey = TopicPartitionOperationKey(topicPartition)
     fetch.checkAndComplete(requestKey)
     produce.checkAndComplete(requestKey)
     deleteRecords.checkAndComplete(requestKey)
   }
 
   def checkAndCompleteFetch(): Unit = {
-    fetch.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+    fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition))
   }
 
   def checkAndCompleteProduce(): Unit = {
-    produce.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+    produce.checkAndComplete(TopicPartitionOperationKey(topicPartition))
   }
 
   def checkAndCompleteDeleteRecords(): Unit = {
-    deleteRecords.checkAndComplete(new TopicPartitionOperationKey(topicPartition))
+    deleteRecords.checkAndComplete(TopicPartitionOperationKey(topicPartition))
   }
 
   def numDelayedDelete: Int = deleteRecords.numDelayed
diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala
index 9209992..3896e00 100644
--- a/core/src/main/scala/kafka/controller/Election.scala
+++ b/core/src/main/scala/kafka/controller/Election.scala
@@ -33,8 +33,8 @@ object Election {
     leaderIsrAndControllerEpochOpt match {
       case Some(leaderIsrAndControllerEpoch) =>
         val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr,
-          liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
+        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(
+          assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
@@ -57,10 +57,13 @@ object Election {
    *
    * @return The election results
    */
-  def leaderForOffline(controllerContext: ControllerContext,
-                       partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]): Seq[ElectionResult] = {
-    partitionsWithUncleanLeaderElectionState.map { case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
-      leaderForOffline(partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled, controllerContext)
+  def leaderForOffline(
+    controllerContext: ControllerContext,
+    partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
+  ): Seq[ElectionResult] = {
+    partitionsWithUncleanLeaderElectionState.map {
+      case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
+        leaderForOffline(partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled, controllerContext)
     }
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0880967..8abb26a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -22,14 +22,16 @@ import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminOperationException
 import kafka.api._
 import kafka.common._
-import kafka.controller.KafkaController.ElectPreferredLeadersCallback
+import kafka.controller.KafkaController.ElectLeadersCallback
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server._
 import kafka.utils._
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zk._
 import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -42,16 +44,16 @@ import scala.collection.JavaConverters._
 import scala.collection._
 import scala.util.{Failure, Try}
 
-sealed trait ElectionType
-object AutoTriggered extends ElectionType
-object ZkTriggered extends ElectionType
-object AdminClientTriggered extends ElectionType
+sealed trait ElectionTrigger
+final case object AutoTriggered extends ElectionTrigger
+final case object ZkTriggered extends ElectionTrigger
+final case object AdminClientTriggered extends ElectionTrigger
 
 object KafkaController extends Logging {
   val InitialControllerEpoch = 0
   val InitialControllerEpochZkVersion = 0
 
-  type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], Map[TopicPartition, ApiError]) => Unit
+  type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit
 }
 
 class KafkaController(val config: KafkaConfig,
@@ -272,7 +274,7 @@ class KafkaController(val config: KafkaConfig,
     maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
     topicDeletionManager.tryTopicDeletion()
     val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
-    onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered)
+    onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
     info("Starting the controller scheduler")
     kafkaScheduler.startup()
     if (config.autoLeaderRebalanceEnable) {
@@ -487,7 +489,11 @@ class KafkaController(val config: KafkaConfig,
     info(s"New partition creation callback for ${newPartitions.mkString(",")}")
     partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
-    partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      newPartitions.toSeq,
+      OnlinePartition,
+      Some(OfflinePartitionLeaderElectionStrategy(false))
+    )
     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
   }
 
@@ -631,34 +637,53 @@ class KafkaController(val config: KafkaConfig,
     removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
   }
 
-
   /**
-    * Attempt to elect the preferred replica as leader for each of the given partitions.
-    * @param partitions The partitions to have their preferred leader elected
-    * @param electionType The election type
-    * @return A map of failed elections where keys are partitions which had an error and the corresponding value is
-    *         the exception that was thrown.
+    * Attempt to elect a replica as leader for each of the given partitions.
+    * @param partitions The partitions to have a new leader elected
+    * @param electionType The type of election to perform
+    * @param electionTrigger The reason for tigger this election
+    * @return A map of failed and successful elections. The keys are the topic partitions and the corresponding values are
+    *         either the exception that was thrown or new leader & ISR.
     */
-  private def onPreferredReplicaElection(partitions: Set[TopicPartition],
-                                         electionType: ElectionType): Map[TopicPartition, Throwable] = {
-    info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}")
+  private[this] def onReplicaElection(
+    partitions: Set[TopicPartition],
+    electionType: ElectionType,
+    electionTrigger: ElectionTrigger
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
+    info(s"Starting replica leader election ($electionType) for partitions ${partitions.mkString(",")} triggerd by $electionTrigger")
     try {
-      val results = partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition,
-        Some(PreferredReplicaPartitionLeaderElectionStrategy))
-      if (electionType != AdminClientTriggered) {
-        results.foreach { case (tp, throwable) =>
-          if (throwable.isInstanceOf[ControllerMovedException]) {
-            error(s"Error completing preferred replica leader election for partition $tp because controller has moved to another broker.", throwable)
-            throw throwable
-          } else {
-            error(s"Error completing preferred replica leader election for partition $tp", throwable)
-          }
+      val strategy = electionType match {
+        case ElectionType.PREFERRED => PreferredReplicaPartitionLeaderElectionStrategy
+        case ElectionType.UNCLEAN =>
+          /* Let's be conservative and only trigger unclean election if the election type is unclean and it was
+           * triggered by the admin client
+           */
+          OfflinePartitionLeaderElectionStrategy(allowUnclean = electionTrigger == AdminClientTriggered)
+      }
+
+      val results = partitionStateMachine.handleStateChanges(
+        partitions.toSeq,
+        OnlinePartition,
+        Some(strategy)
+      )
+      if (electionTrigger != AdminClientTriggered) {
+        results.foreach {
+          case (tp, Left(throwable)) =>
+            if (throwable.isInstanceOf[ControllerMovedException]) {
+              info(s"Error completing replica leader election ($electionType) for partition $tp because controller has moved to another broker.", throwable)
+              throw throwable
+            } else {
+              error(s"Error completing replica leader election ($electionType) for partition $tp", throwable)
+            }
+          case (_, Right(_)) => // Ignored; No need to log or throw exception for the success cases
         }
       }
-      return results;
+
+      results
     } finally {
-      if (electionType != AdminClientTriggered)
-        removePartitionsFromPreferredReplicaElection(partitions, electionType == AutoTriggered)
+      if (electionTrigger != AdminClientTriggered) {
+        removePartitionsFromPreferredReplicaElection(partitions, electionTrigger == AutoTriggered)
+      }
     }
   }
 
@@ -898,7 +923,7 @@ class KafkaController(val config: KafkaConfig,
     if (!isTriggeredByAutoRebalance) {
       zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion)
       // Ensure we detect future preferred replica leader elections
-      eventManager.put(PreferredReplicaLeaderElection(None))
+      eventManager.put(ReplicaLeaderElection(None, ElectionType.PREFERRED, ZkTriggered))
     }
   }
 
@@ -943,16 +968,17 @@ class KafkaController(val config: KafkaConfig,
           // assigned replica list
           val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
           // update the new leadership decision in zookeeper or retry
-          val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) =
+          val UpdateLeaderAndIsrResult(finishedUpdates, _) =
             zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
-          if (successfulUpdates.contains(partition)) {
-            val finalLeaderAndIsr = successfulUpdates(partition)
-            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
-            info(s"Updated leader epoch for partition $partition to ${finalLeaderAndIsr.leaderEpoch}")
-            true
-          } else if (failedUpdates.contains(partition)) {
-            throw failedUpdates(partition)
-          } else false
+
+          finishedUpdates.headOption.map {
+            case (partition, Right(leaderAndIsr)) =>
+              finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
+              info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}")
+              true
+            case (partition, Left(e)) =>
+              throw e
+          }.getOrElse(false)
         case None =>
           throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " +
             "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist")
@@ -992,7 +1018,7 @@ class KafkaController(val config: KafkaConfig,
           controllerContext.partitionsBeingReassigned.isEmpty &&
           !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
           controllerContext.allTopics.contains(tp.topic))
-        onPreferredReplicaElection(candidatePartitions.toSet, AutoTriggered)
+        onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
       }
     }
   }
@@ -1465,71 +1491,95 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-
-  def electPreferredLeaders(partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_,_) => }): Unit = {
-    eventManager.put(PreferredReplicaLeaderElection(Some(partitions), AdminClientTriggered, callback))
+  def electLeaders(
+    partitions: Set[TopicPartition],
+    electionType: ElectionType,
+    callback: ElectLeadersCallback
+  ): Unit = {
+    eventManager.put(ReplicaLeaderElection(Some(partitions), electionType, AdminClientTriggered, callback))
   }
 
-  private def preemptPreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]], callback: ElectPreferredLeadersCallback = (_, _) =>{}): Unit = {
-    callback(Map.empty, partitionsFromAdminClientOpt match {
-      case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
-      case None => Map.empty
-    })
+  private def preemptReplicaLeaderElection(
+    partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
+    callback: ElectLeadersCallback
+  ): Unit = {
+    callback(
+      partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions =>
+        partitions.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null)))(breakOut)
+      }
+    )
   }
 
-  private def processPreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
-                                                    electionType: ElectionType = ZkTriggered,
-                                                    callback: ElectPreferredLeadersCallback = (_,_) =>{}): Unit = {
+  private def processReplicaLeaderElection(
+    partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
+    electionType: ElectionType,
+    electionTrigger: ElectionTrigger,
+    callback: ElectLeadersCallback
+  ): Unit = {
     if (!isActive) {
-      callback(Map.empty, partitionsFromAdminClientOpt match {
-        case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
-        case None => Map.empty
+      callback(partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions =>
+        partitions.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null)))(breakOut)
       })
     } else {
       // We need to register the watcher if the path doesn't exist in order to detect future preferred replica
       // leader elections and we get the `path exists` check for free
-      if (electionType == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
+      if (electionTrigger == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
         val partitions = partitionsFromAdminClientOpt match {
           case Some(partitions) => partitions
           case None => zkClient.getPreferredReplicaElection
         }
 
-        val (validPartitions, invalidPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
-        invalidPartitions.foreach { p =>
-          info(s"Skipping preferred replica leader election for partition ${p} since it doesn't exist.")
+        val (knownPartitions, unknownPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
+        unknownPartitions.foreach { p =>
+          info(s"Skipping replica leader election ($electionType) for partition $p by $electionTrigger since it doesn't exist.")
         }
 
-        val (partitionsBeingDeleted, livePartitions) = validPartitions.partition(partition =>
-          topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
+        val (partitionsBeingDeleted, livePartitions) = knownPartitions.partition(partition =>
+            topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
         if (partitionsBeingDeleted.nonEmpty) {
-          warn(s"Skipping preferred replica election for partitions $partitionsBeingDeleted " +
-            s"since the respective topics are being deleted")
+          warn(s"Skipping replica leader election ($electionType) for partitions $partitionsBeingDeleted " +
+            s"by $electionTrigger since the respective topics are being deleted")
         }
-        // partition those where preferred is already leader
-        val (electablePartitions, alreadyPreferred) = livePartitions.partition { partition =>
-          val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
-          val preferredReplica = assignedReplicas.head
-          val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
-          currentLeader != preferredReplica
+
+        // partition those that have a valid leader
+        val (electablePartitions, alreadyValidLeader) = livePartitions.partition { partition =>
+          electionType match {
+            case ElectionType.PREFERRED =>
+              val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
+              val preferredReplica = assignedReplicas.head
+              val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
+              currentLeader != preferredReplica
+
+            case ElectionType.UNCLEAN =>
+              val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
+              currentLeader == LeaderAndIsr.NoLeader || !controllerContext.liveBrokerIds.contains(currentLeader)
+          }
         }
 
-        val electionErrors = onPreferredReplicaElection(electablePartitions, electionType)
-        val successfulPartitions = electablePartitions -- electionErrors.keySet
-        val results = electionErrors.map { case (partition, ex) =>
-          val apiError = if (ex.isInstanceOf[StateChangeFailedException])
-            new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, ex.getMessage)
-          else
-            ApiError.fromThrowable(ex)
-          partition -> apiError
+        val results = onReplicaElection(electablePartitions, electionType, electionTrigger).mapValues {
+          case Left(ex) =>
+            if (ex.isInstanceOf[StateChangeFailedException]) {
+              val error = if (electionType == ElectionType.PREFERRED) {
+                Errors.PREFERRED_LEADER_NOT_AVAILABLE
+              } else {
+                Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
+              }
+              Left(new ApiError(error, ex.getMessage))
+            } else {
+              Left(ApiError.fromThrowable(ex))
+            }
+          case Right(leaderAndIsr) => Right(leaderAndIsr.leader)
         } ++
-          alreadyPreferred.map(_ -> ApiError.NONE) ++
-          partitionsBeingDeleted.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++
-          invalidPartitions.map ( tp => tp -> new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.")
-          )
-        debug(s"PreferredReplicaLeaderElection waiting: $successfulPartitions, results: $results")
-        callback(successfulPartitions.map(
-          tp => tp->controllerContext.partitionReplicaAssignment(tp).head).toMap,
-          results)
+        alreadyValidLeader.map(_ -> Left(new ApiError(Errors.ELECTION_NOT_NEEDED))) ++
+        partitionsBeingDeleted.map(
+          _ -> Left(new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted"))
+        ) ++
+        unknownPartitions.map(
+          _ -> Left(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist."))
+        )
+
+        debug(s"Waiting for any successful result for election type ($electionType) by $electionTrigger for partitions: $results")
+        callback(results)
       }
     }
   }
@@ -1557,13 +1607,15 @@ class KafkaController(val config: KafkaConfig,
   override def process(event: ControllerEvent): Unit = {
     try {
       event match {
-        // Used only in test cases
         case event: MockEvent =>
+          // Used only in test cases
           event.process()
+        case ShutdownEventThread =>
+          error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")
         case AutoPreferredReplicaLeaderElection =>
           processAutoPreferredReplicaLeaderElection()
-        case PreferredReplicaLeaderElection(partitions, electionType, callback) =>
-          processPreferredReplicaLeaderElection(partitions, electionType, callback)
+        case ReplicaLeaderElection(partitions, electionType, electionTrigger, callback) =>
+          processReplicaLeaderElection(partitions, electionType, electionTrigger, callback)
         case UncleanLeaderElectionEnable =>
           processUncleanLeaderElectionEnable()
         case TopicUncleanLeaderElectionEnable(topic) =>
@@ -1602,8 +1654,6 @@ class KafkaController(val config: KafkaConfig,
           processIsrChangeNotification()
         case Startup =>
           processStartup()
-        case ShutdownEventThread =>
-          // not handled here
       }
     } catch {
       case e: ControllerMovedException =>
@@ -1618,8 +1668,8 @@ class KafkaController(val config: KafkaConfig,
 
   override def preempt(event: ControllerEvent): Unit = {
     event match {
-      case PreferredReplicaLeaderElection(partitions, _, callback) =>
-        preemptPreferredReplicaLeaderElection(partitions, callback)
+      case ReplicaLeaderElection(partitions, _, _, callback) =>
+        preemptReplicaLeaderElection(partitions, callback)
       case ControlledShutdown(id, brokerEpoch, callback) =>
         preemptControlledShutdown(id, brokerEpoch, callback)
       case _ =>
@@ -1699,7 +1749,7 @@ object IsrChangeNotificationHandler {
 class PreferredReplicaElectionHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
   override val path: String = PreferredReplicaElectionZNode.path
 
-  override def handleCreation(): Unit = eventManager.put(PreferredReplicaLeaderElection(None))
+  override def handleCreation(): Unit = eventManager.put(ReplicaLeaderElection(None, ElectionType.PREFERRED, ZkTriggered))
 }
 
 class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
@@ -1842,9 +1892,12 @@ case object IsrChangeNotification extends ControllerEvent {
   override def state: ControllerState = ControllerState.IsrChange
 }
 
-case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
-                                          electionType: ElectionType = ZkTriggered,
-                                          callback: ElectPreferredLeadersCallback = (_,_) => {}) extends ControllerEvent {
+case class ReplicaLeaderElection(
+  partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
+  electionType: ElectionType,
+  electionTrigger: ElectionTrigger,
+  callback: ElectLeadersCallback = _ => {}
+) extends ControllerEvent {
   override def state: ControllerState = ControllerState.ManualLeaderBalance
 }
 
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 637cea8..ab4e8d4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -21,13 +21,14 @@ import kafka.common.StateChangeFailedException
 import kafka.controller.Election._
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
+import kafka.zk.KafkaZkClient
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.TopicPartitionStateZNode
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
-
+import scala.collection.breakOut
 import scala.collection.mutable
 
 abstract class PartitionStateMachine(controllerContext: ControllerContext) extends Logging {
@@ -70,7 +71,7 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
       !controllerContext.isTopicQueuedUpForDeletion(partition.topic)
     }.toSeq
 
-    handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy))
+    handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)))
     // TODO: If handleStateChanges catches an exception, it is not enough to bail out and log an error.
     // It is important to trigger leader election for those partitions.
   }
@@ -96,14 +97,18 @@ abstract class PartitionStateMachine(controllerContext: ControllerContext) exten
     }
   }
 
-  def handleStateChanges(partitions: Seq[TopicPartition],
-                         targetState: PartitionState): Map[TopicPartition, Throwable] = {
+  def handleStateChanges(
+    partitions: Seq[TopicPartition],
+    targetState: PartitionState
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     handleStateChanges(partitions, targetState, None)
   }
 
-  def handleStateChanges(partitions: Seq[TopicPartition],
-                         targetState: PartitionState,
-                         leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable]
+  def handleStateChanges(
+    partitions: Seq[TopicPartition],
+    targetState: PartitionState,
+    leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
 
 }
 
@@ -130,31 +135,40 @@ class ZkPartitionStateMachine(config: KafkaConfig,
   this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
 
   /**
-    * Try to change the state of the given partitions to the given targetState, using the given
-    * partitionLeaderElectionStrategyOpt if a leader election is required.
-    * @param partitions The partitions
-    * @param targetState The state
-    * @param partitionLeaderElectionStrategyOpt The leader election strategy if a leader election is required.
-    * @return partitions and corresponding throwable for those partitions which could not transition to the given state
-    */
-  override def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
-                         partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
+   * Try to change the state of the given partitions to the given targetState, using the given
+   * partitionLeaderElectionStrategyOpt if a leader election is required.
+   * @param partitions The partitions
+   * @param targetState The state
+   * @param partitionLeaderElectionStrategyOpt The leader election strategy if a leader election is required.
+   * @return A map of failed and successful elections when targetState is OnlinePartitions. The keys are the
+   *         topic partitions and the corresponding values are either the exception that was thrown or new
+   *         leader & ISR.
+   */
+  override def handleStateChanges(
+    partitions: Seq[TopicPartition],
+    targetState: PartitionState,
+    partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     if (partitions.nonEmpty) {
       try {
         controllerBrokerRequestBatch.newBatch()
-        val errors = doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
+        val result = doHandleStateChanges(
+          partitions,
+          targetState,
+          partitionLeaderElectionStrategyOpt
+        )
         controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
-        errors
+        result
       } catch {
         case e: ControllerMovedException =>
           error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
           throw e
         case e: Throwable =>
           error(s"Error while moving some partitions to $targetState state", e)
-          partitions.map { _ -> e }.toMap
+          partitions.map(_ -> Left(e))(breakOut)
       }
     } else {
-      Map.empty[TopicPartition, Throwable]
+      Map.empty
     }
   }
 
@@ -183,10 +197,15 @@ class ZkPartitionStateMachine(config: KafkaConfig,
    * --nothing other than marking the partition state as NonExistentPartition
    * @param partitions  The partitions for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
+   * @return A map of failed and successful elections when targetState is OnlinePartitions. The keys are the
+   *         topic partitions and the corresponding values are either the exception that was thrown or new
+   *         leader & ISR.
    */
-  private def doHandleStateChanges(partitions: Seq[TopicPartition],
-                                   targetState: PartitionState,
-                                   partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
+  private def doHandleStateChanges(
+    partitions: Seq[TopicPartition],
+    targetState: PartitionState,
+    partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
     partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
     val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
@@ -212,13 +231,23 @@ class ZkPartitionStateMachine(config: KafkaConfig,
           }
         }
         if (partitionsToElectLeader.nonEmpty) {
-          val (successfulElections, failedElections) = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
-          successfulElections.foreach { partition =>
-            stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
-              s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
-            controllerContext.putPartitionState(partition, OnlinePartition)
+          val electionResults = electLeaderForPartitions(
+            partitionsToElectLeader,
+            partitionLeaderElectionStrategyOpt.getOrElse(
+              throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
+            )
+          )
+
+          electionResults.foreach {
+            case (partition, Right(leaderAndIsr)) =>
+              stateChangeLog.trace(
+                s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
+              )
+              controllerContext.putPartitionState(partition, OnlinePartition)
+            case (_, Left(_)) => // Ignore; no need to update partition state on election error
           }
-          failedElections
+
+          electionResults
         } else {
           Map.empty
         }
@@ -293,24 +322,30 @@ class ZkPartitionStateMachine(config: KafkaConfig,
    * Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry.
    * @param partitions The partitions that we're trying to elect leaders for.
    * @param partitionLeaderElectionStrategy The election strategy to use.
-   * @return A pair with first element of which is the partitions that successfully had a leader elected
-    *        and the second element a map of failed partition to the corresponding thrown exception.
+   * @return A map of failed and successful elections. The keys are the topic partitions and the corresponding values are
+   *         either the exception that was thrown or new leader & ISR.
    */
-  private def electLeaderForPartitions(partitions: Seq[TopicPartition],
-                                       partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): (Seq[TopicPartition], Map[TopicPartition, Throwable]) = {
-    val successfulElections = mutable.Buffer.empty[TopicPartition]
+  private def electLeaderForPartitions(
+    partitions: Seq[TopicPartition],
+    partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     var remaining = partitions
-    var failures = Map.empty[TopicPartition, Throwable]
+    val finishedElections = mutable.Map.empty[TopicPartition, Either[Throwable, LeaderAndIsr]]
+
     while (remaining.nonEmpty) {
-      val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy)
+      val (finished, updatesToRetry) = doElectLeaderForPartitions(remaining, partitionLeaderElectionStrategy)
       remaining = updatesToRetry
-      successfulElections ++= success
-      failedElections.foreach { case (partition, e) =>
-        logFailedStateChange(partition, partitionState(partition), OnlinePartition, e)
+
+      finished.foreach {
+        case (partition, Left(e)) =>
+          logFailedStateChange(partition, partitionState(partition), OnlinePartition, e)
+        case (_, Right(_)) => // Ignore; success so no need to log failed state change
       }
-      failures ++= failedElections
+
+      finishedElections ++= finished
     }
-    (successfulElections, failures)
+
+    finishedElections.toMap
   }
 
   /**
@@ -319,21 +354,23 @@ class ZkPartitionStateMachine(config: KafkaConfig,
    *
    * @param partitions The partitions that we're trying to elect leaders for.
    * @param partitionLeaderElectionStrategy The election strategy to use.
-   * @return A tuple of three values:
-   *         1. The partitions that successfully had a leader elected.
+   * @return A tuple of two values:
+   *         1. The partitions and the expected leader and isr that successfully had a leader elected. And exceptions
+   *         corresponding to failed elections that should not be retried.
    *         2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
    *         the partition leader updated partition state while the controller attempted to update partition state.
-   *         3. Exceptions corresponding to failed elections that should not be retried.
    */
-  private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
-  (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception]) = {
+  private def doElectLeaderForPartitions(
+    partitions: Seq[TopicPartition],
+    partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
+  ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
     val getDataResponses = try {
       zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
       case e: Exception =>
-        return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
+        return (partitions.map(_ -> Left(e))(breakOut), Seq.empty)
     }
-    val failedElections = mutable.Map.empty[TopicPartition, Exception]
+    val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
     val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)]
     getDataResponses.foreach { getDataResponse =>
       val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
@@ -342,16 +379,17 @@ class ZkPartitionStateMachine(config: KafkaConfig,
         val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
         if (leaderIsrAndControllerEpochOpt.isEmpty) {
           val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
-          failedElections.put(partition, exception)
+          failedElections.put(partition, Left(exception))
         }
         leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get
       } else if (getDataResponse.resultCode == Code.NONODE) {
         val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
-        failedElections.put(partition, exception)
+        failedElections.put(partition, Left(exception))
       } else {
-        failedElections.put(partition, getDataResponse.resultException.get)
+        failedElections.put(partition, Left(getDataResponse.resultException.get))
       }
     }
+
     val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
       leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
     }
@@ -359,14 +397,19 @@ class ZkPartitionStateMachine(config: KafkaConfig,
       val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
         s"already written by another controller. This probably means that the current controller $controllerId went through " +
         s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
-      failedElections.put(partition, new StateChangeFailedException(failMsg))
+      failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
     }
+
     if (validPartitionsForElection.isEmpty) {
-      return (Seq.empty, Seq.empty, failedElections.toMap)
+      return (failedElections.toMap, Seq.empty)
     }
+
     val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
-      case OfflinePartitionLeaderElectionStrategy =>
-        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(validPartitionsForElection)
+      case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
+        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
+          validPartitionsForElection,
+          allowUnclean
+        )
         leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
       case ReassignPartitionLeaderElectionStrategy =>
         leaderForReassign(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
@@ -378,37 +421,79 @@ class ZkPartitionStateMachine(config: KafkaConfig,
     partitionsWithoutLeaders.foreach { electionResult =>
       val partition = electionResult.topicPartition
       val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
-      failedElections.put(partition, new StateChangeFailedException(failMsg))
+      failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
     }
     val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
     val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
-    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
+    val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
-    successfulUpdates.foreach { case (partition, leaderAndIsr) =>
-      val replicas = controllerContext.partitionReplicaAssignment(partition)
-      val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
-      controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
-      controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
-        leaderIsrAndControllerEpoch, replicas, isNew = false)
+    finishedUpdates.foreach { case (partition, result) =>
+      result.right.foreach { leaderAndIsr =>
+        val replicas = controllerContext.partitionReplicaAssignment(partition)
+        val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
+        controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
+          leaderIsrAndControllerEpoch, replicas, isNew = false)
+      }
     }
-    (successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates)
+
+    (finishedUpdates ++ failedElections, updatesToRetry)
   }
 
-  private def collectUncleanLeaderElectionState(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
-  Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)] = {
-    val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition { case (partition, leaderIsrAndControllerEpoch) =>
-      val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-      liveInSyncReplicas.isEmpty
+  /* For the provided set of topic partition and partition sync state it attempts to determine if unclean
+   * leader election should be performed. Unclean election should be performed if there are no live
+   * replica which are in sync and unclean leader election is allowed (allowUnclean parameter is true or
+   * the topic has been configured to allow unclean election).
+   *
+   * @param leaderIsrAndControllerEpochs set of partition to determine if unclean leader election should be
+   *                                     allowed
+   * @param allowUnclean whether to allow unclean election without having to read the topic configuration
+   * @return a sequence of three element tuple:
+   *         1. topic partition
+   *         2. leader, isr and controller epoc. Some means election should be performed
+   *         3. allow unclean
+   */
+  private def collectUncleanLeaderElectionState(
+    leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)],
+    allowUnclean: Boolean
+  ): Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)] = {
+    val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition {
+      case (partition, leaderIsrAndControllerEpoch) =>
+        val liveInSyncReplicas = leaderIsrAndControllerEpoch
+          .leaderAndIsr
+          .isr
+          .filter(replica => controllerContext.isReplicaOnline(replica, partition))
+        liveInSyncReplicas.isEmpty
     }
-    val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
-    partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
-      if (failed.contains(partition.topic)) {
-        logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
-        (partition, None, false)
-      } else {
-        (partition, Option(leaderIsrAndControllerEpoch), logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue())
+
+    val electionForPartitionWithoutLiveReplicas = if (allowUnclean) {
+      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
+        (partition, Option(leaderIsrAndControllerEpoch), true)
       }
-    } ++ partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => (partition, Option(leaderIsrAndControllerEpoch), false) }
+    } else {
+      val (logConfigs, failed) = zkClient.getLogConfigs(
+        partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }(breakOut),
+        config.originals()
+      )
+
+      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
+        if (failed.contains(partition.topic)) {
+          logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
+          (partition, None, false)
+        } else {
+          (
+            partition,
+            Option(leaderIsrAndControllerEpoch),
+            logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue()
+          )
+        }
+      }
+    }
+
+    electionForPartitionWithoutLiveReplicas ++
+    partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
+      (partition, Option(leaderIsrAndControllerEpoch), false)
+    }
   }
 
   private def logInvalidTransition(partition: TopicPartition, targetState: PartitionState): Unit = {
@@ -458,10 +543,10 @@ object PartitionLeaderElectionAlgorithms {
 }
 
 sealed trait PartitionLeaderElectionStrategy
-case object OfflinePartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
-case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
-case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
-case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
+final case class OfflinePartitionLeaderElectionStrategy(allowUnclean: Boolean) extends PartitionLeaderElectionStrategy
+final case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
+final case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
+final case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
 
 sealed trait PartitionState {
   def state: Byte
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index d10d9b6..cdc1d22 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -20,12 +20,13 @@ import kafka.api.LeaderAndIsr
 import kafka.common.StateChangeFailedException
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
-import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zk.TopicPartitionStateZNode
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.zookeeper.KeeperException.Code
-
+import scala.collection.breakOut
 import scala.collection.mutable
 
 abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
@@ -274,18 +275,23 @@ class ZkReplicaStateMachine(config: KafkaConfig,
    * @param partitions The partitions from which we're trying to remove the replica from isr
    * @return The updated LeaderIsrAndControllerEpochs of all partitions for which we successfully removed the replica from isr.
    */
-  private def removeReplicasFromIsr(replicaId: Int, partitions: Seq[TopicPartition]):
-  Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
+  private def removeReplicasFromIsr(
+    replicaId: Int,
+    partitions: Seq[TopicPartition]
+  ): Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
     var results = Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
     var remaining = partitions
     while (remaining.nonEmpty) {
-      val (successfulRemovals, removalsToRetry, failedRemovals) = doRemoveReplicasFromIsr(replicaId, remaining)
-      results ++= successfulRemovals
+      val (finishedRemoval, removalsToRetry) = doRemoveReplicasFromIsr(replicaId, remaining)
       remaining = removalsToRetry
-      failedRemovals.foreach { case (partition, e) =>
-        val replica = PartitionAndReplica(partition, replicaId)
-        val currentState = controllerContext.replicaState(replica)
-        logFailedStateChange(replica, currentState, OfflineReplica, e)
+
+      finishedRemoval.foreach {
+        case (partition, Left(e)) =>
+            val replica = PartitionAndReplica(partition, replicaId)
+            val currentState = controllerContext.replicaState(replica)
+            logFailedStateChange(replica, currentState, OfflineReplica, e)
+        case (partition, Right(leaderIsrAndEpoch)) =>
+          results += partition -> leaderIsrAndEpoch
       }
     }
     results
@@ -297,86 +303,117 @@ class ZkReplicaStateMachine(config: KafkaConfig,
    *
    * @param replicaId The replica being removed from isr of multiple partitions
    * @param partitions The partitions from which we're trying to remove the replica from isr
-   * @return A tuple of three values:
-   *         1. The updated LeaderIsrAndControllerEpochs of all partitions for which we successfully removed the replica from isr.
+   * @return A tuple of two elements:
+   *         1. The updated Right[LeaderIsrAndControllerEpochs] of all partitions for which we successfully
+   *         removed the replica from isr. Or Left[Exception] corresponding to failed removals that should
+   *         not be retried
    *         2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
    *         the partition leader updated partition state while the controller attempted to update partition state.
-   *         3. Exceptions corresponding to failed removals that should not be retried.
    */
-  private def doRemoveReplicasFromIsr(replicaId: Int, partitions: Seq[TopicPartition]):
-  (Map[TopicPartition, LeaderIsrAndControllerEpoch],
-    Seq[TopicPartition],
-    Map[TopicPartition, Exception]) = {
-    val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions)
-    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
-    val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr =>
-      val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
-      val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
-      leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+  private def doRemoveReplicasFromIsr(
+    replicaId: Int,
+    partitions: Seq[TopicPartition]
+  ): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = {
+    val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) = getTopicPartitionStatesFromZk(partitions)
+    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, result) =>
+      result.right.map { leaderAndIsr =>
+        leaderAndIsr.isr.contains(replicaId)
+      }.right.getOrElse(false)
     }
-    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
-      adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
-    val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
-      if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
-        val exception = new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $partition since the leader and isr path in zookeeper is empty")
-        Option(partition -> exception)
-      } else None
-    }.toMap
-    val leaderIsrAndControllerEpochs = (leaderAndIsrsWithoutReplica ++ successfulUpdates).map { case (partition, leaderAndIsr) =>
-      val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
-      controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
-      partition -> leaderIsrAndControllerEpoch
+
+    val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrsWithReplica.flatMap {
+      case (partition, result) =>
+        result.right.toOption.map { leaderAndIsr =>
+          val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
+          val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
+          partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+        }
     }
-    (leaderIsrAndControllerEpochs, updatesToRetry, failedStateReads ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk ++ failedUpdates)
+
+    val UpdateLeaderAndIsrResult(finishedPartitions, updatesToRetry) = zkClient.updateLeaderAndIsr(
+      adjustedLeaderAndIsrs,
+      controllerContext.epoch,
+      controllerContext.epochZkVersion
+    )
+
+    val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
+      partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
+        if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
+          val exception = new StateChangeFailedException(
+            s"Failed to change state of replica $replicaId for partition $partition since the leader and isr " +
+            "path in zookeeper is empty"
+          )
+          Option(partition -> Left(exception))
+        } else None
+      }(breakOut)
+
+    val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
+      (leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result: Either[Exception, LeaderAndIsr]) =>
+        (
+          partition,
+          result.right.map { leaderAndIsr =>
+            val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
+            controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+            leaderIsrAndControllerEpoch
+          }
+          )
+      }
+
+    (
+      leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk,
+      updatesToRetry
+    )
   }
 
   /**
    * Gets the partition state from zookeeper
    * @param partitions the partitions whose state we want from zookeeper
-   * @return A tuple of three values:
-   *         1. The LeaderAndIsrs of partitions whose state we successfully read from zookeeper
+   * @return A tuple of two values:
+   *         1. The Right(LeaderAndIsrs) of partitions whose state we successfully read from zookeeper.
+   *         The Left(Exception) to failed zookeeper lookups or states whose controller epoch exceeds our current epoch
    *         2. The partitions that had no leader and isr state in zookeeper. This happens if the controller
    *         didn't finish partition initialization.
-   *         3. Exceptions corresponding to failed zookeeper lookups or states whose controller epoch exceeds our current epoch.
    */
-  private def getTopicPartitionStatesFromZk(partitions: Seq[TopicPartition]):
-  (Map[TopicPartition, LeaderAndIsr],
-    Seq[TopicPartition],
-    Map[TopicPartition, Exception]) = {
-    val leaderAndIsrs = mutable.Map.empty[TopicPartition, LeaderAndIsr]
-    val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition]
-    val failed = mutable.Map.empty[TopicPartition, Exception]
+  private def getTopicPartitionStatesFromZk(
+    partitions: Seq[TopicPartition]
+  ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
     val getDataResponses = try {
       zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
       case e: Exception =>
-        partitions.foreach(partition => failed.put(partition, e))
-        return (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)
+        return (partitions.map(_ -> Left(e))(breakOut), Seq.empty)
     }
+
+    val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition]
+    val result = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
+
     getDataResponses.foreach { getDataResponse =>
       val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
-      if (getDataResponse.resultCode == Code.OK) {
-        val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
-        if (leaderIsrAndControllerEpochOpt.isEmpty) {
-          partitionsWithNoLeaderAndIsrInZk += partition
-        } else {
-          val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
-          if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
-            val exception = new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
-              s"means the current controller with epoch ${controllerContext.epoch} went through a soft failure and another " +
-              s"controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}. Aborting state change by this controller")
-            failed.put(partition, exception)
-          } else {
-            leaderAndIsrs.put(partition, leaderIsrAndControllerEpoch.leaderAndIsr)
-          }
+      val _: Unit = if (getDataResponse.resultCode == Code.OK) {
+        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
+          case None =>
+            partitionsWithNoLeaderAndIsrInZk += partition
+          case Some(leaderIsrAndControllerEpoch) =>
+            if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
+              val exception = new StateChangeFailedException(
+                "Leader and isr path written by another controller. This probably " +
+                s"means the current controller with epoch ${controllerContext.epoch} went through a soft failure and " +
+                s"another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}. Aborting " +
+                "state change by this controller"
+              )
+              result += (partition -> Left(exception))
+            } else {
+              result += (partition -> Right(leaderIsrAndControllerEpoch.leaderAndIsr))
+            }
         }
       } else if (getDataResponse.resultCode == Code.NONODE) {
         partitionsWithNoLeaderAndIsrInZk += partition
       } else {
-        failed.put(partition, getDataResponse.resultException.get)
+        result += (partition -> Left(getDataResponse.resultException.get))
       }
     }
-    (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)
+
+    (result.toMap, partitionsWithNoLeaderAndIsrInZk)
   }
 
   private def logSuccessfulTransition(replicaId: Int, partition: TopicPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 5a16193..91cf79e 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -999,7 +999,10 @@ object LogManager {
     val defaultLogConfig = LogConfig(defaultProps)
 
     // read the log configurations from zookeeper
-    val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
+    val (topicConfigs, failed) = zkClient.getLogConfigs(
+      zkClient.getAllTopicsInCluster,
+      defaultProps
+    )
     if (!failed.isEmpty) throw failed.head._2
 
     val cleanerConfig = LogCleaner.cleanerConfig(config)
diff --git a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala b/core/src/main/scala/kafka/server/DelayedElectLeader.scala
similarity index 83%
rename from core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala
rename to core/src/main/scala/kafka/server/DelayedElectLeader.scala
index f3543a8..9a1546b 100644
--- a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala
+++ b/core/src/main/scala/kafka/server/DelayedElectLeader.scala
@@ -23,15 +23,16 @@ import org.apache.kafka.common.requests.ApiError
 
 import scala.collection.{Map, mutable}
 
-/** A delayed elect preferred leader operation that can be created by the replica manager and watched
-  * in the elect preferred leader purgatory
+/** A delayed elect leader operation that can be created by the replica manager and watched
+  * in the elect leader purgatory
   */
-class DelayedElectPreferredLeader(delayMs: Long,
-                                  expectedLeaders: Map[TopicPartition, Int],
-                                  results: Map[TopicPartition, ApiError],
-                                  replicaManager: ReplicaManager,
-                                  responseCallback: Map[TopicPartition, ApiError] => Unit)
-    extends DelayedOperation(delayMs) {
+class DelayedElectLeader(
+  delayMs: Long,
+  expectedLeaders: Map[TopicPartition, Int],
+  results: Map[TopicPartition, ApiError],
+  replicaManager: ReplicaManager,
+  responseCallback: Map[TopicPartition, ApiError] => Unit
+) extends DelayedOperation(delayMs) {
 
   var waitingPartitions = expectedLeaders
   val fullResults = results.to[mutable.Set]
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index bfa7fc2..3be412b 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -33,11 +33,16 @@ object DelayedOperationKey {
 /* used by delayed-produce and delayed-fetch operations */
 case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey {
 
-  def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
 
   override def keyLabel = "%s-%d".format(topic, partition)
 }
 
+object TopicPartitionOperationKey {
+  def apply(topicPartition: TopicPartition): TopicPartitionOperationKey = {
+    apply(topicPartition.topic, topicPartition.partition)
+  }
+}
+
 /* used by delayed-join-group operations */
 case class MemberKey(groupId: String, consumerId: String) extends DelayedOperationKey {
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9cedb01..8b41902 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,15 +17,16 @@
 
 package kafka.server
 
-import java.lang.{Long => JLong}
 import java.lang.{Byte => JByte}
+import java.lang.{Long => JLong}
 import java.nio.ByteBuffer
 import java.util
+import java.util.Optional
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.{Collections, Optional, Properties}
 
 import kafka.admin.{AdminUtils, RackAwareMode}
+import kafka.api.ElectLeadersRequestOps
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
@@ -47,9 +48,23 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.message.CreateTopicsResponseData
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
-import org.apache.kafka.common.message._
+import org.apache.kafka.common.message.DeleteTopicsResponseData
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
+import org.apache.kafka.common.message.DescribeGroupsResponseData
+import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.FindCoordinatorResponseData
+import org.apache.kafka.common.message.HeartbeatResponseData
+import org.apache.kafka.common.message.InitProducerIdResponseData
+import org.apache.kafka.common.message.JoinGroupResponseData
+import org.apache.kafka.common.message.LeaveGroupResponseData
+import org.apache.kafka.common.message.OffsetCommitRequestData
+import org.apache.kafka.common.message.OffsetCommitResponseData
+import org.apache.kafka.common.message.SaslAuthenticateResponseData
+import org.apache.kafka.common.message.SaslHandshakeResponseData
+import org.apache.kafka.common.message.SyncGroupResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -153,7 +168,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
-        case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request)
+        case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
       }
     } catch {
@@ -264,7 +279,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       if (replicaManager.hasDelayedElectionOperations) {
         updateMetadataRequest.partitionStates.asScala.foreach { case (tp, _) =>
-          replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp))
+          replicaManager.tryCompleteElection(TopicPartitionOperationKey(tp))
         }
       }
       sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
@@ -330,7 +345,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      var errorMap = new mutable.HashMap[TopicPartition, Errors]
+      val errorMap = new mutable.HashMap[TopicPartition, Errors]
       for (topicData <- offsetCommitRequest.data().topics().asScala) {
         for (partitionData <- topicData.partitions().asScala) {
           val topicPartition = new TopicPartition(topicData.name(), partitionData.partitionIndex())
@@ -936,20 +951,20 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def createTopic(topic: String,
                           numPartitions: Int,
                           replicationFactor: Int,
-                          properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
+                          properties: util.Properties = new util.Properties()): MetadataResponse.TopicMetadata = {
     try {
       adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
       info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
         .format(topic, numPartitions, replicationFactor))
       new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
-        java.util.Collections.emptyList())
+        util.Collections.emptyList())
     } catch {
       case _: TopicExistsException => // let it go, possibly another broker created this topic
         new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
-          java.util.Collections.emptyList())
+          util.Collections.emptyList())
       case ex: Throwable  => // Catch all to prevent unhandled errors
         new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
-          java.util.Collections.emptyList())
+          util.Collections.emptyList())
     }
   }
 
@@ -966,7 +981,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
             s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
             s"and not all brokers are up yet.")
-          new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
+          new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
         } else {
           createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
             groupCoordinator.offsetsTopicConfigs)
@@ -977,7 +992,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
             s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
             s"and not all brokers are up yet.")
-          new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
+          new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList())
         } else {
           createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
             txnCoordinator.transactionTopicConfigs)
@@ -1004,13 +1019,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (isInternal(topic)) {
           val topicMetadata = createInternalTopic(topic)
           if (topicMetadata.error == Errors.COORDINATOR_NOT_AVAILABLE)
-            new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, java.util.Collections.emptyList())
+            new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList())
           else
             topicMetadata
         } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
-          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())
+          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList())
         }
       }
       topicResponses ++ responsesForNonExistentTopics
@@ -1048,7 +1063,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
       new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic),
-        java.util.Collections.emptyList()))
+        util.Collections.emptyList()))
 
     // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
     val unauthorizedForDescribeTopicMetadata =
@@ -1057,7 +1072,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         Set.empty[MetadataResponse.TopicMetadata]
       else
         unauthorizedForDescribeTopics.map(topic =>
-          new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, java.util.Collections.emptyList()))
+          new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList()))
 
     // In version 0, we returned an error when brokers with replicas were unavailable,
     // while in higher versions we simply don't include the broker in the returned broker list
@@ -1362,7 +1377,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
             .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
             .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
-            .setMembers(Collections.emptyList())
+            .setMembers(util.Collections.emptyList())
         )
       )
     }  else {
@@ -1676,7 +1691,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setName(topic))
       }
       results.asScala.foreach(topic => {
-         if (!authorize(request.session, Delete, Resource(Topic, topic.name, LITERAL))) 
+         if (!authorize(request.session, Delete, Resource(Topic, topic.name, LITERAL)))
            topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
          else if (!metadataCache.contains(topic.name))
            topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
@@ -2073,7 +2088,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case None =>
         sendResponseMaybeThrottle(request, requestThrottleMs =>
           new DescribeAclsResponse(requestThrottleMs,
-            new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is configured on the broker"), Collections.emptySet()))
+            new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is configured on the broker"), util.Collections.emptySet()))
       case Some(auth) =>
         val filter = describeAclsRequest.filter()
         val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) =>
@@ -2278,7 +2293,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }.toMap, describeConfigsRequest.includeSynonyms)
     val unauthorizedConfigs = unauthorizedResources.map { resource =>
       val error = configsAuthorizationApiError(request.session, resource)
-      resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+      resource -> new DescribeConfigsResponse.Config(error, util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
     }
 
     sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -2435,45 +2450,74 @@ class KafkaApis(val requestChannel: RequestChannel,
       true
   }
 
-  def handleElectPreferredReplicaLeader(request: RequestChannel.Request): Unit = {
+  def handleElectReplicaLeader(request: RequestChannel.Request): Unit = {
 
-    val electionRequest = request.body[ElectPreferredLeadersRequest]
-    val partitions =
-      if (electionRequest.data().topicPartitions() == null) {
-        metadataCache.getAllPartitions()
-      } else {
-        electionRequest.data().topicPartitions().asScala.flatMap{tp =>
-          tp.partitionId().asScala.map(partitionId => new TopicPartition(tp.topic, partitionId))}.toSet
-      }
-    def sendResponseCallback(result: Map[TopicPartition, ApiError]): Unit = {
+    val electionRequest = request.body[ElectLeadersRequest]
+
+    def sendResponseCallback(
+      error: ApiError
+    )(
+      results: Map[TopicPartition, ApiError]
+    ): Unit = {
       sendResponseMaybeThrottle(request, requestThrottleMs => {
-        val results = result.
-          groupBy{case (tp, error) => tp.topic}.
-          map{case (topic, ps) => new ElectPreferredLeadersResponseData.ReplicaElectionResult()
-            .setTopic(topic)
-            .setPartitionResult(ps.map{
-            case (tp, error) =>
-              new ElectPreferredLeadersResponseData.PartitionResult()
-                .setErrorCode(error.error.code)
-                .setErrorMessage(error.message())
-                .setPartitionId(tp.partition)}.toList.asJava)}
-        val data = new ElectPreferredLeadersResponseData()
-          .setThrottleTimeMs(requestThrottleMs)
-          .setReplicaElectionResults(results.toList.asJava)
-        new ElectPreferredLeadersResponse(data)})
+        val adjustedResults = if (electionRequest.data().topicPartitions() == null) {
+          /* When performing elections across all of the partitions we should only return
+           * partitions for which there was an eleciton or resulted in an error. In other
+           * words, partitions that didn't need election because they ready have the correct
+           * leader are not returned to the client.
+           */
+          results.filter { case (_, error) =>
+            error.error != Errors.ELECTION_NOT_NEEDED
+          }
+        } else results
+
+        val electionResults = new util.ArrayList[ReplicaElectionResult]()
+        adjustedResults
+          .groupBy { case (tp, _) => tp.topic }
+          .foreach { case (topic, ps) =>
+            val electionResult = new ReplicaElectionResult()
+
+            electionResult.setTopic(topic)
+            ps.foreach { case (topicPartition, error) =>
+              val partitionResult = new PartitionResult()
+              partitionResult.setPartitionId(topicPartition.partition)
+              partitionResult.setErrorCode(error.error.code)
+              partitionResult.setErrorMessage(error.message)
+              electionResult.partitionResult.add(partitionResult)
+            }
+
+            electionResults.add(electionResult)
+          }
+
+        new ElectLeadersResponse(
+          requestThrottleMs,
+          error.error.code,
+          electionResults,
+          electionRequest.version
+        )
+      })
     }
+
     if (!authorize(request.session, Alter, Resource.ClusterResource)) {
-      val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null);
-      val partitionErrors =
-      if (electionRequest.data().topicPartitions() == null) {
-        // Don't leak the set of partitions if the client lack authz
-        Map.empty[TopicPartition, ApiError]
+      val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
+      val partitionErrors: Map[TopicPartition, ApiError] =
+        electionRequest.topicPartitions.map(partition => partition -> error)(breakOut)
+
+      sendResponseCallback(error)(partitionErrors)
+    } else {
+      val partitions = if (electionRequest.data().topicPartitions() == null) {
+        metadataCache.getAllPartitions()
       } else {
-        partitions.map(partition => partition -> error).toMap
+        electionRequest.topicPartitions
       }
-      sendResponseCallback(partitionErrors)
-    } else {
-      replicaManager.electPreferredLeaders(controller, partitions, sendResponseCallback, electionRequest.data().timeoutMs())
+
+      replicaManager.electLeaders(
+        controller,
+        partitions,
+        electionRequest.electionType,
+        sendResponseCallback(ApiError.NONE),
+        electionRequest.data().timeoutMs()
+      )
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 55663d3..8cfa247 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -32,20 +32,21 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints, SimpleOffsetCheckpoints}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -158,7 +159,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
                      val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
                      val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
-                     val delayedElectPreferredLeaderPurgatory: DelayedOperationPurgatory[DelayedElectPreferredLeader],
+                     val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
                      threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
 
   def this(config: KafkaConfig,
@@ -184,8 +185,8 @@ class ReplicaManager(val config: KafkaConfig,
       DelayedOperationPurgatory[DelayedDeleteRecords](
         purgatoryName = "DeleteRecords", brokerId = config.brokerId,
         purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
-      DelayedOperationPurgatory[DelayedElectPreferredLeader](
-        purgatoryName = "ElectPreferredLeader", brokerId = config.brokerId),
+      DelayedOperationPurgatory[DelayedElectLeader](
+        purgatoryName = "ElectLeader", brokerId = config.brokerId),
       threadNamePrefix)
   }
 
@@ -305,11 +306,11 @@ class ReplicaManager(val config: KafkaConfig,
 
   def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition)
 
-  def hasDelayedElectionOperations: Boolean = delayedElectPreferredLeaderPurgatory.numDelayed != 0
+  def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0
 
   def tryCompleteElection(key: DelayedOperationKey): Unit = {
-    val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key)
-    debug("Request key %s unblocked %d ElectPreferredLeader.".format(key.keyLabel, completed))
+    val completed = delayedElectLeaderPurgatory.checkAndComplete(key)
+    debug("Request key %s unblocked %d ElectLeader.".format(key.keyLabel, completed))
   }
 
   def startup() {
@@ -504,7 +505,7 @@ class ReplicaManager(val config: KafkaConfig,
         val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
 
         // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
-        val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+        val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
 
         // try to complete the request immediately, otherwise put it into the purgatory
         // this is because while the delayed produce operation is being created, new
@@ -704,7 +705,7 @@ class ReplicaManager(val config: KafkaConfig,
       val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
-      val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+      val deleteRecordsRequestKeys = offsetPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
 
       // try to complete the request immediately, otherwise put it into the purgatory
       // this is because while the delayed delete records operation is being created, new
@@ -883,7 +884,7 @@ class ReplicaManager(val config: KafkaConfig,
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
+      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
 
       // try to complete the request immediately, otherwise put it into the purgatory;
       // this is because while the delayed fetch operation is being created, new requests
@@ -1326,7 +1327,7 @@ class ReplicaManager(val config: KafkaConfig,
       }
 
       partitionsToMakeFollower.foreach { partition =>
-        val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
+        val topicPartitionOperationKey = TopicPartitionOperationKey(partition.topicPartition)
         delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
         delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
       }
@@ -1512,7 +1513,7 @@ class ReplicaManager(val config: KafkaConfig,
     delayedFetchPurgatory.shutdown()
     delayedProducePurgatory.shutdown()
     delayedDeleteRecordsPurgatory.shutdown()
-    delayedElectPreferredLeaderPurgatory.shutdown()
+    delayedElectLeaderPurgatory.shutdown()
     if (checkpointHW)
       checkpointHighWatermarks()
     info("Shut down completely")
@@ -1546,29 +1547,45 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def electPreferredLeaders(controller: KafkaController,
-                            partitions: Set[TopicPartition],
-                            responseCallback: Map[TopicPartition, ApiError] => Unit,
-                            requestTimeout: Long): Unit = {
+  def electLeaders(
+    controller: KafkaController,
+    partitions: Set[TopicPartition],
+    electionType: ElectionType,
+    responseCallback: Map[TopicPartition, ApiError] => Unit,
+    requestTimeout: Int
+  ): Unit = {
 
     val deadline = time.milliseconds() + requestTimeout
 
-    def electionCallback(expectedLeaders: Map[TopicPartition, Int],
-                         results: Map[TopicPartition, ApiError]): Unit = {
+    def electionCallback(results: Map[TopicPartition, Either[ApiError, Int]]): Unit = {
+      val expectedLeaders = mutable.Map.empty[TopicPartition, Int]
+      val failures = mutable.Map.empty[TopicPartition, ApiError]
+      results.foreach {
+        case (partition, Right(leader)) => expectedLeaders += partition -> leader
+        case (partition, Left(error)) => failures += partition -> error
+      }
+
       if (expectedLeaders.nonEmpty) {
-        val watchKeys = expectedLeaders.map{
-          case (tp, _) => new TopicPartitionOperationKey(tp)
-        }.toSeq
-        delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch(
-          new DelayedElectPreferredLeader(deadline - time.milliseconds(), expectedLeaders, results,
-            this, responseCallback),
-          watchKeys)
+        val watchKeys: Seq[TopicPartitionOperationKey] = expectedLeaders.map{
+          case (tp, _) => TopicPartitionOperationKey(tp)
+        }(breakOut)
+
+        delayedElectLeaderPurgatory.tryCompleteElseWatch(
+          new DelayedElectLeader(
+            math.max(0, deadline - time.milliseconds()),
+            expectedLeaders,
+            failures,
+            this,
+            responseCallback
+          ),
+          watchKeys
+        )
       } else {
           // There are no partitions actually being elected, so return immediately
-          responseCallback(results)
+          responseCallback(failures)
       }
     }
 
-    controller.electPreferredLeaders(partitions, electionCallback)
+    controller.electLeaders(partitions, electionType, electionCallback)
   }
 }
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 782ec2a..a41eda2 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -29,18 +29,18 @@ import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils.Logging
 import kafka.zookeeper._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, mutable}
+import scala.collection.Seq
+import scala.collection.breakOut
+import scala.collection.mutable
 
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -250,10 +250,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    * @return UpdateLeaderAndIsrResult instance containing per partition results.
    */
-  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr], controllerEpoch: Int, expectedControllerEpochZkVersion: Int): UpdateLeaderAndIsrResult = {
-    val successfulUpdates = mutable.Map.empty[TopicPartition, LeaderAndIsr]
-    val updatesToRetry = mutable.Buffer.empty[TopicPartition]
-    val failed = mutable.Map.empty[TopicPartition, Exception]
+  def updateLeaderAndIsr(
+    leaderAndIsrs: Map[TopicPartition, LeaderAndIsr],
+    controllerEpoch: Int,
+    expectedControllerEpochZkVersion: Int
+  ): UpdateLeaderAndIsrResult = {
     val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) =>
       partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     }
@@ -262,20 +263,26 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     } catch {
       case e: ControllerMovedException => throw e
       case e: Exception =>
-        leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
-        return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
+        return UpdateLeaderAndIsrResult(leaderAndIsrs.keys.map(_ -> Left(e))(breakOut), Seq.empty)
     }
-    setDataResponses.foreach { setDataResponse =>
+
+    val updatesToRetry = mutable.Buffer.empty[TopicPartition]
+    val finished: Map[TopicPartition, Either[Exception, LeaderAndIsr]] = setDataResponses.flatMap { setDataResponse =>
       val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition]
       setDataResponse.resultCode match {
         case Code.OK =>
           val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
-          successfulUpdates.put(partition, updatedLeaderAndIsr)
-        case Code.BADVERSION => updatesToRetry += partition
-        case _ => failed.put(partition, setDataResponse.resultException.get)
+          Some(partition -> Right(updatedLeaderAndIsr))
+        case Code.BADVERSION =>
+          // Update the buffer for partitions to retry
+          updatesToRetry += partition
+          None
+        case _ =>
+          Some(partition -> Left(setDataResponse.resultException.get))
       }
-    }
-    UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
+    }(breakOut)
+
+    UpdateLeaderAndIsrResult(finished, updatesToRetry)
   }
 
   /**
@@ -286,8 +293,10 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    *         1. The successfully gathered log configs
    *         2. Exceptions corresponding to failed log config lookups.
    */
-  def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]):
-  (Map[String, LogConfig], Map[String, Exception]) = {
+  def getLogConfigs(
+    topics: Set[String],
+    config: java.util.Map[String, AnyRef]
+  ): (Map[String, LogConfig], Map[String, Exception]) = {
     val logConfigs = mutable.Map.empty[String, LogConfig]
     val failed = mutable.Map.empty[String, Exception]
     val configResponses = try {
@@ -453,11 +462,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * Gets all topics in the cluster.
    * @return sequence of topics in the cluster.
    */
-  def getAllTopicsInCluster: Seq[String] = {
+  def getAllTopicsInCluster: Set[String] = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path))
     getChildrenResponse.resultCode match {
-      case Code.OK => getChildrenResponse.children
-      case Code.NONODE => Seq.empty
+      case Code.OK => getChildrenResponse.children.toSet
+      case Code.NONODE => Set.empty
       case _ => throw getChildrenResponse.resultException.get
     }
 
@@ -1627,10 +1636,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     retryRequestsUntilConnected(createRequests, expectedControllerEpochZkVersion)
   }
 
-  private def getTopicConfigs(topics: Seq[String]): Seq[GetDataResponse] = {
-    val getDataRequests = topics.map { topic =>
+  private def getTopicConfigs(topics: Set[String]): Seq[GetDataResponse] = {
+    val getDataRequests: Seq[GetDataRequest] = topics.map { topic =>
       GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
-    }
+    }(breakOut)
+
     retryRequestsUntilConnected(getDataRequests)
   }
 
@@ -1654,8 +1664,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
-    val remainingRequests = ArrayBuffer(requests: _*)
-    val responses = new ArrayBuffer[Req#Response]
+    val remainingRequests = mutable.ArrayBuffer(requests: _*)
+    val responses = new mutable.ArrayBuffer[Req#Response]
     while (remainingRequests.nonEmpty) {
       val batchResponses = zooKeeperClient.handleRequests(remainingRequests)
 
@@ -1798,15 +1808,16 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 object KafkaZkClient {
 
   /**
-   * @param successfulPartitions The successfully updated partition states with adjusted znode versions.
+   * @param finishedPartitions Partitions that finished either in successfully
+   *                      updated partition states or failed with an exception.
    * @param partitionsToRetry The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts
    *                      can occur if the partition leader updated partition state while the controller attempted to
    *                      update partition state.
-   * @param failedPartitions Exceptions corresponding to failed partition state updates.
    */
-  case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicPartition, LeaderAndIsr],
-                                      partitionsToRetry: Seq[TopicPartition],
-                                      failedPartitions: Map[TopicPartition, Exception])
+  case class UpdateLeaderAndIsrResult(
+    finishedPartitions: Map[TopicPartition, Either[Exception, LeaderAndIsr]],
+    partitionsToRetry: Seq[TopicPartition]
+  )
 
   /**
    * Create an instance of this class with the provided parameters.
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 46cd318..66689e4 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -35,7 +35,10 @@ import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, TopicPartitionReplica}
+import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.TopicPartitionReplica
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
@@ -1295,7 +1298,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       zkClient.createPartitionReassignment(m)
       TestUtils.waitUntilTrue(
         () => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred,
-        s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}", 10000)
+        s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}",
+        10000)
       // Check the leader hasn't moved
       assertEquals(prior1, currentLeader(partition1))
       assertEquals(prior2, currentLeader(partition2))
@@ -1306,48 +1310,44 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(0, currentLeader(partition2))
 
     // Noop election
-    var electResult = client.electPreferredLeaders(asList(partition1))
-    electResult.partitionResult(partition1).get()
+    var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
+    var exception = electResult.partitions.get.get(partition1).get
+    assertEquals(classOf[ElectionNotNeededException], exception.getClass)
+    assertEquals("Leader election not needed for topic partition", exception.getMessage)
     assertEquals(0, currentLeader(partition1))
 
     // Noop election with null partitions
-    electResult = client.electPreferredLeaders(null)
-    electResult.partitionResult(partition1).get()
+    electResult = client.electLeaders(ElectionType.PREFERRED, null)
+    assertTrue(electResult.partitions.get.isEmpty)
     assertEquals(0, currentLeader(partition1))
-    electResult.partitionResult(partition2).get()
     assertEquals(0, currentLeader(partition2))
 
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
 
     // meaningful election
-    electResult = client.electPreferredLeaders(asList(partition1))
-    assertEquals(Set(partition1).asJava, electResult.partitions.get)
-    electResult.partitionResult(partition1).get()
+    electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava)
+    assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
+    assertFalse(electResult.partitions.get.get(partition1).isPresent)
     waitForLeaderToBecome(partition1, 1)
 
     // topic 2 unchanged
-    var e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause
-    assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
-    assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted",
-      e.getMessage)
+    assertFalse(electResult.partitions.get.containsKey(partition2))
     assertEquals(0, currentLeader(partition2))
 
     // meaningful election with null partitions
-    electResult = client.electPreferredLeaders(null)
-    assertEquals(Set(partition1, partition2), electResult.partitions.get.asScala.filterNot(_.topic == "__consumer_offsets"))
-    electResult.partitionResult(partition1).get()
-    waitForLeaderToBecome(partition1, 1)
-    electResult.partitionResult(partition2).get()
+    electResult = client.electLeaders(ElectionType.PREFERRED, null)
+    assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala)
+    assertFalse(electResult.partitions.get.get(partition2).isPresent)
     waitForLeaderToBecome(partition2, 1)
 
     // unknown topic
     val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
-    electResult = client.electPreferredLeaders(asList(unknownPartition))
-    assertEquals(Set(unknownPartition).asJava, electResult.partitions.get)
-    e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause
-    assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
-    assertEquals("The partition does not exist.", e.getMessage)
+    electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition).asJava)
+    assertEquals(Set(unknownPartition).asJava, electResult.partitions.get.keySet)
+    exception = electResult.partitions.get.get(unknownPartition).get
+    assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
+    assertEquals("The partition does not exist.", exception.getMessage)
     assertEquals(1, currentLeader(partition1))
     assertEquals(1, currentLeader(partition2))
 
@@ -1355,18 +1355,18 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     changePreferredLeader(prefer2)
 
     // mixed results
-    electResult = client.electPreferredLeaders(asList(unknownPartition, partition1))
-    assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get)
+    electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition, partition1).asJava)
+    assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet)
     waitForLeaderToBecome(partition1, 2)
     assertEquals(1, currentLeader(partition2))
-    e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause
-    assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
-    assertEquals("The partition does not exist.", e.getMessage)
-
-    // dupe partitions
-    electResult = client.electPreferredLeaders(asList(partition2, partition2))
-    assertEquals(Set(partition2).asJava, electResult.partitions.get)
-    electResult.partitionResult(partition2).get()
+    exception = electResult.partitions.get.get(unknownPartition).get
+    assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass)
+    assertEquals("The partition does not exist.", exception.getMessage)
+
+    // elect preferred leader for partition 2
+    electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava)
+    assertEquals(Set(partition2).asJava, electResult.partitions.get.keySet)
+    assertFalse(electResult.partitions.get.get(partition2).isPresent)
     waitForLeaderToBecome(partition2, 2)
 
     // Now change the preferred leader to 1
@@ -1374,34 +1374,32 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     // but shut it down...
     servers(1).shutdown()
     waitUntilTrue (() => {
-      val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all().get()
+      val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all.get
       val isr = description.asScala.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
       !isr.exists(_.id == 1)
     }, "Expect broker 1 to no longer be in any ISR")
 
     // ... now what happens if we try to elect the preferred leader and it's down?
-    val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000)
-    electResult = client.electPreferredLeaders(asList(partition1), shortTimeout)
-    assertEquals(Set(partition1).asJava, electResult.partitions.get)
-    e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause
-    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
-    assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
+    val shortTimeout = new ElectLeadersOptions().timeoutMs(10000)
+    electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava, shortTimeout)
+    assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet)
+    exception = electResult.partitions.get.get(partition1).get
+    assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
+    assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
     assertEquals(2, currentLeader(partition1))
 
     // preferred leader unavailable with null argument
-    electResult = client.electPreferredLeaders(null, shortTimeout)
-    e = intercept[ExecutionException](electResult.partitions.get()).getCause
-    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
+    electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout)
 
-    e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause
-    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
-    assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
+    exception = electResult.partitions.get.get(partition1).get
+    assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
+    assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
 
-    e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause
-    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
-    assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
+    exception = electResult.partitions.get.get(partition2).get
+    assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass)
+    assertTrue(s"Wrong message ${exception.getMessage}", exception.getMessage.contains(
       "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
 
     assertEquals(2, currentLeader(partition1))
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 011a19b..d7823ac 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -29,14 +29,25 @@ import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConf
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message._
+import org.apache.kafka.common.message.ControlledShutdownRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.DeleteTopicsRequestData
+import org.apache.kafka.common.message.DescribeGroupsRequestData
+import org.apache.kafka.common.message.FindCoordinatorRequestData
+import org.apache.kafka.common.message.HeartbeatRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
+import org.apache.kafka.common.message.JoinGroupRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
+import org.apache.kafka.common.message.LeaveGroupRequestData
+import org.apache.kafka.common.message.OffsetCommitRequestData
+import org.apache.kafka.common.message.SyncGroupRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, RecordBatch, SimpleRecord}
@@ -151,7 +162,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
       ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
       ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse],
-      ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse],
+      ApiKeys.ELECT_LEADERS -> classOf[ElectLeadersResponse],
       ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse]
     )
 
@@ -198,8 +209,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
       if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
     ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
-    ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) =>
-      ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error()),
+    ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())),
     ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) =>
       IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error)
   )
@@ -240,7 +250,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
     ApiKeys.CREATE_PARTITIONS -> topicAlterAcl,
-    ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl,
+    ApiKeys.ELECT_LEADERS -> clusterAlterAcl,
     ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl
   )
 
@@ -464,8 +474,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
 
-  private def electPreferredLeadersRequest = new ElectPreferredLeadersRequest.Builder(
-    ElectPreferredLeadersRequest.toRequestData(Collections.singleton(tp), 10000)).build()
+  private def electLeadersRequest = new ElectLeadersRequest.Builder(
+    ElectionType.PREFERRED,
+    Collections.singleton(tp),
+    10000
+  ).build()
 
   @Test
   def testAuthorizationWithTopicExisting() {
@@ -501,7 +514,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
       // Check StopReplica last since some APIs depend on replica availability
       ApiKeys.STOP_REPLICA -> stopReplicaRequest,
-      ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest,
+      ApiKeys.ELECT_LEADERS -> electLeadersRequest,
       ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest
     )
 
@@ -549,7 +562,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
       ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
       ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
-      ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest
+      ApiKeys.ELECT_LEADERS -> electLeadersRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
deleted file mode 100644
index cf752b8..0000000
--- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import java.util.Properties
-
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{Logging, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.junit.Assert.assertEquals
-import org.junit.{After, Test}
-
-import scala.collection.{Map, Set}
-
-class PreferredReplicaElectionCommandTest extends ZooKeeperTestHarness with Logging {
-  var servers: Seq[KafkaServer] = Seq()
-
-  @After
-  override def tearDown() {
-    TestUtils.shutdownServers(servers)
-    super.tearDown()
-  }
-
-  @Test
-  def testPreferredReplicaJsonData() {
-    // write preferred replica json data to zk path
-    val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
-    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
-    // try to read it back and compare with what was written
-    val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
-    assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
-      partitionsUndergoingPreferredReplicaElection)
-  }
-
-  @Test
-  def testBasicPreferredReplicaElection() {
-    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
-    val topic = "test"
-    val partition = 0
-    val preferredReplica = 0
-    // create brokers
-    val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
-    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
-    // create the topic
-    adminZkClient.createTopicWithAssignment(topic, config = new Properties, expectedReplicaAssignment)
-    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
-    // broker 2 should be the leader since it was started first
-    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None)
-    // trigger preferred replica election
-    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition)))
-    preferredReplicaElection.moveLeaderToPreferredReplica()
-    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader))
-    assertEquals("Preferred replica election failed", preferredReplica, newLeader)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
index 96e7dac..7ee2c59 100644
--- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
@@ -28,7 +28,10 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, PreferredLeaderNotAvailableException, TimeoutException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException
+import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 import org.apache.kafka.common.network.ListenerName
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -65,8 +68,15 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
       Map(tp -> assigment))
     }
     // wait until replica log is created on every broker
-    TestUtils.waitUntilTrue(() => servers.forall(server => partitionsAndAssignments.forall(partitionAndAssignment => server.getLogManager().getLog(partitionAndAssignment._1).isDefined)),
-      "Replicas for topic test not created")
+    TestUtils.waitUntilTrue(
+      () =>
+        servers.forall { server =>
+          partitionsAndAssignments.forall { partitionAndAssignment =>
+            server.getLogManager().getLog(partitionAndAssignment._1).isDefined
+          }
+        },
+      "Replicas for topic test not created"
+    )
   }
 
   /** Bounce the given targetServer and wait for all servers to get metadata for the given partition */
@@ -291,8 +301,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
       fail();
     } catch {
       case e: AdminCommandFailedException =>
-        assertEquals("1 preferred replica(s) could not be elected", e.getMessage)
-        assertTrue(e.getSuppressed()(0).getMessage.contains("Timed out waiting for a node assignment"))
+        assertEquals("Timeout waiting for election results", e.getMessage)
         // Check we still have the same leader
         assertEquals(leader, getLeader(testPartition))
     } finally {
@@ -318,8 +327,8 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
       fail();
     } catch {
       case e: AdminCommandFailedException =>
-        assertEquals("1 preferred replica(s) could not be elected", e.getMessage)
-        assertTrue(e.getSuppressed()(0).isInstanceOf[ClusterAuthorizationException])
+        assertEquals("Not authorized to perform leader election", e.getMessage)
+        assertTrue(e.getCause().isInstanceOf[ClusterAuthorizationException])
         // Check we still have the same leader
         assertEquals(leader, getLeader(testPartition))
     } finally {
@@ -327,6 +336,37 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
     }
   }
 
+  @Test
+  def testPreferredReplicaJsonData() {
+    // write preferred replica json data to zk path
+    val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
+    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
+    // try to read it back and compare with what was written
+    val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
+    assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
+      partitionsUndergoingPreferredReplicaElection)
+  }
+
+  @Test
+  def testBasicPreferredReplicaElection() {
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
+    val topic = "test"
+    val partition = 0
+    val preferredReplica = 0
+    // create brokers
+    val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
+    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
+    // create the topic
+    adminZkClient.createTopicWithAssignment(topic, config = new Properties, expectedReplicaAssignment)
+    servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
+    // broker 2 should be the leader since it was started first
+    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None)
+    // trigger preferred replica election
+    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(new TopicPartition(topic, partition)))
+    preferredReplicaElection.moveLeaderToPreferredReplica()
+    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader))
+    assertEquals("Preferred replica election failed", preferredReplica, newLeader)
+  }
 }
 
 class PreferredReplicaLeaderElectionCommandTestAuthorizer extends SimpleAclAuthorizer {
diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
index 2578199..0e2c8be 100644
--- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -16,19 +16,21 @@
  */
 package kafka.controller
 
+import kafka.api.LeaderAndIsr
 import kafka.common.StateChangeFailedException
 import kafka.controller.Election._
 import org.apache.kafka.common.TopicPartition
-
-import scala.collection.mutable
+import scala.collection.breakOut
 
 class MockPartitionStateMachine(controllerContext: ControllerContext,
                                 uncleanLeaderElectionEnabled: Boolean)
   extends PartitionStateMachine(controllerContext) {
 
-  override def handleStateChanges(partitions: Seq[TopicPartition],
-                                  targetState: PartitionState,
-                                  leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = {
+  override def handleStateChanges(
+    partitions: Seq[TopicPartition],
+    targetState: PartitionState,
+    leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
     val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
     if (invalidPartitions.nonEmpty) {
@@ -47,13 +49,13 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
         controllerContext.putPartitionState(partition, targetState)
       }
 
-      val failedElections = doLeaderElections(partitionsToElectLeader, leaderElectionStrategy.get)
-      val successfulElections = partitionsToElectLeader.filterNot(failedElections.keySet.contains)
-      successfulElections.foreach { partition =>
-        controllerContext.putPartitionState(partition, targetState)
+      val electionResults = doLeaderElections(partitionsToElectLeader, leaderElectionStrategy.get)
+      electionResults.foreach {
+        case (partition, Right(_)) => controllerContext.putPartitionState(partition, targetState)
+        case (_, Left(_)) => // Ignore; No need to update the context if the election failed
       }
 
-      failedElections
+      electionResults
     } else {
       validPartitions.foreach { partition =>
         controllerContext.putPartitionState(partition, targetState)
@@ -62,9 +64,10 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
     }
   }
 
-  private def doLeaderElections(partitions: Seq[TopicPartition],
-                                leaderElectionStrategy: PartitionLeaderElectionStrategy): Map[TopicPartition, Throwable] = {
-    val failedElections = mutable.Map.empty[TopicPartition, Exception]
+  private def doLeaderElections(
+    partitions: Seq[TopicPartition],
+    leaderElectionStrategy: PartitionLeaderElectionStrategy
+  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
     val leaderIsrAndControllerEpochPerPartition = partitions.map { partition =>
       partition -> controllerContext.partitionLeadershipInfo(partition)
     }
@@ -72,17 +75,19 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
     val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
       leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
     }
-    invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
+
+    val failedElections = invalidPartitionsForElection.map { case (partition, leaderIsrAndControllerEpoch) =>
       val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
         s"already written by another controller. This probably means that the current controller went through " +
         s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
-      failedElections.put(partition, new StateChangeFailedException(failMsg))
+
+      partition -> Left(new StateChangeFailedException(failMsg))
     }
 
     val electionResults = leaderElectionStrategy match {
-      case OfflinePartitionLeaderElectionStrategy =>
+      case OfflinePartitionLeaderElectionStrategy(isUnclean) =>
         val partitionsWithUncleanLeaderElectionState = validPartitionsForElection.map { case (partition, leaderIsrAndControllerEpoch) =>
-          (partition, Some(leaderIsrAndControllerEpoch), uncleanLeaderElectionEnabled)
+          (partition, Some(leaderIsrAndControllerEpoch), isUnclean || uncleanLeaderElectionEnabled)
         }
         leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState)
       case ReassignPartitionLeaderElectionStrategy =>
@@ -93,18 +98,22 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
         leaderForControlledShutdown(controllerContext, validPartitionsForElection)
     }
 
-    for (electionResult <- electionResults) {
+    val results: Map[TopicPartition, Either[Exception, LeaderAndIsr]] = electionResults.map { electionResult =>
       val partition = electionResult.topicPartition
-      electionResult.leaderAndIsr match {
+      val value = electionResult.leaderAndIsr match {
         case None =>
           val failMsg = s"Failed to elect leader for partition $partition under strategy $leaderElectionStrategy"
-          failedElections.put(partition, new StateChangeFailedException(failMsg))
+          Left(new StateChangeFailedException(failMsg))
         case Some(leaderAndIsr) =>
           val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
           controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+          Right(leaderAndIsr)
       }
-    }
-    failedElections.toMap
+
+      partition -> value
+    }(breakOut)
+
+    results ++ failedElections
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 1d83470..9b159fb 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -20,8 +20,8 @@ import kafka.api.LeaderAndIsr
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
@@ -30,6 +30,7 @@ import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.mockito.Mockito
+import scala.collection.breakOut
 
 class PartitionStateMachineTest {
   private var controllerContext: ControllerContext = null
@@ -65,7 +66,11 @@ class PartitionStateMachineTest {
 
   @Test
   def testInvalidNonexistentPartitionToOnlinePartitionTransition(): Unit = {
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(false))
+    )
     assertEquals(NonExistentPartition, partitionState(partition))
   }
 
@@ -88,7 +93,11 @@ class PartitionStateMachineTest {
       partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(false))
+    )
     EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
@@ -104,7 +113,11 @@ class PartitionStateMachineTest {
       .andThrow(new ZooKeeperClientException("test"))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(false))
+    )
     EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(NewPartition, partitionState(partition))
   }
@@ -120,7 +133,11 @@ class PartitionStateMachineTest {
       .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null, ResponseMetadata(0, 0))))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(false))
+    )
     EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(NewPartition, partitionState(partition))
   }
@@ -157,7 +174,7 @@ class PartitionStateMachineTest {
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
-      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
@@ -190,7 +207,7 @@ class PartitionStateMachineTest {
     val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
-      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
 
     // The leaderAndIsr request should be sent to both brokers, including the shutting down one
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId, otherBrokerId),
@@ -240,18 +257,89 @@ class PartitionStateMachineTest {
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))
 
-    EasyMock.expect(mockZkClient.getLogConfigs(Seq.empty, config.originals()))
+    EasyMock.expect(mockZkClient.getLogConfigs(Set.empty, config.originals()))
       .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
-      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(false))
+    )
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToUncleanOnlinePartitionTransition(): Unit = {
+    /* Starting scenario: Leader: X, Isr: [X], Replicas: [X, Y], LiveBrokers: [Y]
+     * Ending scenario: Leader: Y, Isr: [Y], Replicas: [X, Y], LiverBrokers: [Y]
+     *
+     * For the give staring scenario verify that performing an unclean leader
+     * election on the offline partition results on the first live broker getting
+     * elected.
+     */
+    val leaderBrokerId = brokerId + 1
+    controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(leaderBrokerId, brokerId))
+    controllerContext.putPartitionState(partition, OfflinePartition)
+
+    val leaderAndIsr = LeaderAndIsr(leaderBrokerId, List(leaderBrokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock
+      .expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
+      .andReturn(
+        Seq(
+          GetDataResponse(
+            Code.OK,
+            null,
+            Option(partition),
+            TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch),
+            new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
+            ResponseMetadata(0, 0)
+          )
+        )
+      )
+
+    val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
+    val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
+
+    EasyMock
+      .expect(
+        mockZkClient.updateLeaderAndIsr(
+          Map(partition -> leaderAndIsrAfterElection),
+          controllerEpoch,
+          controllerContext.epochZkVersion
+        )
+      )
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
+    EasyMock.expect(
+      mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
+        Seq(brokerId),
+        partition,
+        LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
+        Seq(leaderBrokerId, brokerId),
+        false
+      )
+    )
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(true))
+    )
     EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
@@ -272,7 +360,11 @@ class PartitionStateMachineTest {
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(false))
+    )
     EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OfflinePartition, partitionState(partition))
   }
@@ -295,7 +387,11 @@ class PartitionStateMachineTest {
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(
+      partitions,
+      OnlinePartition,
+      Option(OfflinePartitionLeaderElectionStrategy(false))
+    )
     EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OfflinePartition, partitionState(partition))
   }
@@ -327,18 +423,17 @@ class PartitionStateMachineTest {
     prepareMockToGetTopicPartitionsStatesRaw()
 
     def prepareMockToGetLogConfigs(): Unit = {
-      val topicsForPartitionsWithNoLiveInSyncReplicas = Seq()
-      EasyMock.expect(mockZkClient.getLogConfigs(topicsForPartitionsWithNoLiveInSyncReplicas, config.originals()))
+      EasyMock.expect(mockZkClient.getLogConfigs(Set.empty, config.originals()))
         .andReturn(Map.empty, Map.empty)
     }
     prepareMockToGetLogConfigs()
 
     def prepareMockToUpdateLeaderAndIsr(): Unit = {
-      val updatedLeaderAndIsr = partitions.map { partition =>
+      val updatedLeaderAndIsr: Map[TopicPartition, LeaderAndIsr] = partitions.map { partition =>
         partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
-      }.toMap
+      }(breakOut)
       EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch, controllerContext.epochZkVersion))
-        .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr, Seq.empty, Map.empty))
+        .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr.mapValues(Right(_)), Seq.empty))
     }
     prepareMockToUpdateLeaderAndIsr()
   }
@@ -366,7 +461,7 @@ class PartitionStateMachineTest {
     partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
     assertEquals(s"There should be ${partitions.size} offline partition(s)", partitions.size, controllerContext.offlinePartitionCount)
 
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy))
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)))
     assertEquals(s"There should be no offline partition(s)", 0, controllerContext.offlinePartitionCount)
   }
 
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 2420333..6afa5b6 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -222,7 +222,7 @@ class ReplicaStateMachineTest {
       Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch, controllerContext.epochZkVersion))
-      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+      .andReturn(UpdateLeaderAndIsrResult(Map(partition -> Right(updatedLeaderAndIsr)), Seq.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index d5becea..e695078 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -201,7 +201,7 @@ object AbstractCoordinatorConcurrencyTest {
           })
         }
       }
-      val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+      val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
       watchKeys ++= producerRequestKeys
       producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
       tryCompleteDelayedRequests()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b7239e0..7c14cd2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -659,8 +659,8 @@ class ReplicaManagerTest {
       purgatoryName = "Fetch", timer, reaperEnabled = false)
     val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
       purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
-    val mockElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader](
-      purgatoryName = "ElectPreferredLeader", timer, reaperEnabled = false)
+    val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
+      purgatoryName = "ElectLeader", timer, reaperEnabled = false)
 
     // Mock network client to show leader offset of 5
     val quota = QuotaFactory.instantiate(config, metrics, time, "")
@@ -669,7 +669,7 @@ class ReplicaManagerTest {
     val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr,
       new AtomicBoolean(false), quota, mockBrokerTopicStats,
       metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory,
-      mockDeleteRecordsPurgatory, mockElectPreferredLeaderPurgatory, Option(this.getClass.getName)) {
+      mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, Option(this.getClass.getName)) {
 
       override protected def createReplicaFetcherManager(metrics: Metrics,
                                                      time: Time,
@@ -819,13 +819,13 @@ class ReplicaManagerTest {
       purgatoryName = "Fetch", timer, reaperEnabled = false)
     val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
       purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
-    val mockDelayedElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader](
-      purgatoryName = "DelayedElectPreferredLeader", timer, reaperEnabled = false)
+    val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
+      purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
 
     new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
-      mockDeleteRecordsPurgatory, mockDelayedElectPreferredLeaderPurgatory, Option(this.getClass.getName))
+      mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName))
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index e6cb0ec..c0d9b44 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -21,14 +21,27 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
-import org.apache.kafka.common.acl._
+import org.apache.kafka.common.ElectionType
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message._
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
-import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.message.ControlledShutdownRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.DeleteTopicsRequestData
+import org.apache.kafka.common.message.DescribeGroupsRequestData
+import org.apache.kafka.common.message.FindCoordinatorRequestData
+import org.apache.kafka.common.message.HeartbeatRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.InitProducerIdRequestData
+import org.apache.kafka.common.message.JoinGroupRequestData
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
+import org.apache.kafka.common.message.LeaveGroupRequestData
+import org.apache.kafka.common.message.OffsetCommitRequestData
+import org.apache.kafka.common.message.SaslAuthenticateRequestData
+import org.apache.kafka.common.message.SaslHandshakeRequestData
+import org.apache.kafka.common.message.SyncGroupRequestData
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -36,6 +49,7 @@ import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests._
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.common.utils.SecurityUtils
@@ -429,14 +443,12 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.DELETE_GROUPS =>
           new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
 
-        case ApiKeys.ELECT_PREFERRED_LEADERS =>
-          val partition = new ElectPreferredLeadersRequestData.TopicPartitions()
-            .setPartitionId(Collections.singletonList(0))
-            .setTopic("my_topic")
-          new ElectPreferredLeadersRequest.Builder(
-            new ElectPreferredLeadersRequestData()
-                .setTimeoutMs(0)
-                .setTopicPartitions(Collections.singletonList(partition)))
+        case ApiKeys.ELECT_LEADERS =>
+          new ElectLeadersRequest.Builder(
+            ElectionType.PREFERRED,
+            Collections.singletonList(new TopicPartition("my_topic", 0)),
+            0
+          )
 
         case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
           new IncrementalAlterConfigsRequest.Builder(
@@ -542,7 +554,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
       case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
       case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs
-      case ApiKeys.ELECT_PREFERRED_LEADERS => new ElectPreferredLeadersResponse(response).throttleTimeMs
+      case ApiKeys.ELECT_LEADERS => new ElectLeadersResponse(response).throttleTimeMs
       case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
         new IncrementalAlterConfigsResponse(response, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion()).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 9557d15..5a4336b 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -159,7 +159,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
     // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
     val zkMock: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     EasyMock.expect(zkMock.topicExists(topic)).andReturn(false)
-    EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic"))
+    EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Set("some.topic", topic, "some.other.topic"))
     EasyMock.replay(zkMock)
     val adminZkClient = new AdminZkClient(zkMock)
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 66a8114..5d8846e 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -648,19 +648,19 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val emptyConfig = LogConfig(Collections.emptyMap())
     assertEquals("Non existent config, no defaults",
       (Map(topic1 -> emptyConfig), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+      zkClient.getLogConfigs(Set(topic1), Collections.emptyMap()))
 
     val logProps2 = createLogProps(2048)
 
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals("One existing and one non-existent topic",
       (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+      zkClient.getLogConfigs(Set(topic1, topic2), Collections.emptyMap()))
 
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
     assertEquals("Two existing topics",
       (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+      zkClient.getLogConfigs(Set(topic1, topic2), Collections.emptyMap()))
 
     val logProps1WithMoreValues = createLogProps(1024)
     logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
@@ -668,7 +668,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertEquals("Config with defaults",
       (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1),
+      zkClient.getLogConfigs(Set(topic1),
         Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
   }
 
@@ -794,14 +794,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
                   expectedPartitionsToRetry: Seq[TopicPartition],
                   expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
                   actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
-    val failedPartitionsExcerpt =
-      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+    val failedPartitionsExcerpt = mutable.Map.empty[TopicPartition, (Class[_], String)]
+    val successfulPartitions = mutable.Map.empty[TopicPartition, LeaderAndIsr]
+
+    actualUpdateLeaderAndIsrResult.finishedPartitions.foreach {
+      case (partition, Left(e)) => failedPartitionsExcerpt += partition -> (e.getClass, e.getMessage)
+      case (partition, Right(leaderAndIsr)) => successfulPartitions += partition -> leaderAndIsr
+    }
+
     assertEquals("Permanently failed updates do not match expected",
       expectedFailedPartitions, failedPartitionsExcerpt)
     assertEquals("Retriable updates (due to BADVERSION) do not match expected",
       expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
     assertEquals("Successful updates do not match expected",
-      expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+      expectedSuccessfulPartitions, successfulPartitions)
   }
 
   @Test
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d9232fa..3c737ec 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -71,6 +71,12 @@
     </li>
 </ol>
 
+<h5><a id="upgrade_240_notable" href="#upgrade_240_notable">Notable changes in 2.4.0</a></h5>
+<ul>
+    <li>The <code>bin/kafka-preferred-replica-election.sh</code> command line tool has been deprecated. It has been replaced by <code>bin/kafka-leader-election.sh</code>.</li>
+    <li>The methods <code>electPreferredLeaders</code> in the Java <code>AdminClient</code> class have been deprecated in favor of the methods <code>electLeaders</code>.</li>
+</ul>
+
 <h5><a id="upgrade_230_notable" href="#upgrade_230_notable">Notable changes in 2.3.0</a></h5>
 <ul>
     <li>
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 721b05e..eeda703 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -100,8 +100,10 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
-        <!-- Add a suppression for the equals() method of NetworkClientBlockingOps. -->
-        <Class name="kafka.utils.NetworkClientBlockingOps"/>
+        <!-- Suppression for the equals() for extensiom methods. -->
+        <Or>
+            <Class name="kafka.api.package$ElectLeadersRequestOps"/>
+        </Or>
         <Bug pattern="EQ_UNUSUAL"/>
     </Match>
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 3aff0a2..d4d2e1c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -272,7 +272,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
      */
     public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException {
-        final List<String> topics = JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
+        final Set<String> topics = JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
         for (final String topic : topics) {
             try {
                 brokers[0].deleteTopic(topic);
@@ -312,7 +312,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         @Override
         public boolean conditionMet() {
             final Set<String> allTopics = new HashSet<>(
-                    JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava());
+                    JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava());
             return !allTopics.removeAll(deletedTopics);
         }
     }
@@ -326,8 +326,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
         @Override
         public boolean conditionMet() {
-            final Set<String> allTopics = new HashSet<>(
-                    JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava());
+            final Set<String> allTopics = JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
             return allTopics.equals(remainingTopics);
         }
     }


Mime
View raw message