kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7439; Replace EasyMock and PowerMock with Mockito in clients module
Date Wed, 10 Oct 2018 18:00:42 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 0b298a5  KAFKA-7439; Replace EasyMock and PowerMock with Mockito in clients module
0b298a5 is described below

commit 0b298a5d05df4455c5a9b4bab736b0eb60689acc
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Tue Oct 9 15:55:09 2018 -0700

    KAFKA-7439; Replace EasyMock and PowerMock with Mockito in clients module
    
    Development of EasyMock and PowerMock has stagnated while Mockito
    continues to be actively developed. With the new Java release cadence,
    it's a problem to depend on libraries that do bytecode manipulation
    and are not actively maintained. In addition, Mockito is also
    easier to use.
    
    While updating the tests, I attempted to go from failing test to
    passing test. In cases where the updated test passed on the first
    attempt, I artificially broke it to ensure the test was still doing its
    job.
    
    I included a few improvements that were helpful while making these
    changes:
    
    1. Better exception if there are no nodes in `leastLoadedNodes`
    2. Always close the producer in `KafkaProducerTest`
    3. requestsInFlight producer metric should not hold a reference to
    `Sender`
    
    Finally, `Metadata` is no longer final so that we don't need
    `PowerMock` to mock it. It's an internal class, so it's OK.
    
    Author: Ismael Juma <ismael@juma.me.uk>
    
    Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Dong Lin <lindong28@gmail.com>
    
    Closes #5691 from ijuma/kafka-7438-mockito
---
 build.gradle                                       |   4 +-
 checkstyle/import-control.xml                      |   1 +
 .../java/org/apache/kafka/clients/Metadata.java    |   2 +-
 .../org/apache/kafka/clients/NetworkClient.java    |   2 +
 .../kafka/clients/producer/KafkaProducer.java      | 116 ++++----
 .../clients/producer/internals/BufferPool.java     |   7 +-
 .../kafka/clients/producer/internals/Sender.java   |  23 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  18 +-
 .../internals/ConsumerNetworkClientTest.java       |  57 ++--
 .../kafka/clients/producer/KafkaProducerTest.java  | 315 +++++++++++----------
 .../clients/producer/internals/BufferPoolTest.java |  47 +--
 .../clients/producer/internals/SenderTest.java     |  56 ++--
 .../kafka/common/network/ChannelBuildersTest.java  |   6 +-
 .../apache/kafka/common/network/SelectorTest.java  |  51 ++--
 .../kafka/common/record/FileRecordsTest.java       |  48 ++--
 .../auth/DefaultKafkaPrincipalBuilderTest.java     |  72 +++--
 .../authenticator/SaslServerAuthenticatorTest.java |  54 ++--
 .../oauthbearer/OAuthBearerLoginModuleTest.java    |  62 ++--
 .../internals/OAuthBearerSaslClientTest.java       |   3 +-
 .../ExpiringCredentialRefreshingLoginTest.java     | 108 ++++---
 .../org/apache/kafka/common/utils/UtilsTest.java   |  82 +++---
 gradle/dependencies.gradle                         |   2 +
 22 files changed, 537 insertions(+), 599 deletions(-)

diff --git a/build.gradle b/build.gradle
index 2651f10..ce1314c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -830,9 +830,7 @@ project(':clients') {
 
     testCompile libs.bcpkix
     testCompile libs.junit
-    testCompile libs.easymock
-    testCompile libs.powermockJunit4
-    testCompile libs.powermockEasymock
+    testCompile libs.mockitoCore
 
     testRuntime libs.slf4jlog4j
     testRuntime libs.jacksonDatabind
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bd5c11f..7810a3e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -28,6 +28,7 @@
   <allow pkg="org.slf4j" />
   <allow pkg="org.junit" />
   <allow pkg="org.hamcrest" />
+  <allow pkg="org.mockito" />
   <allow pkg="org.easymock" />
   <allow pkg="org.powermock" />
   <allow pkg="java.security" />
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 1413eac..0abb5c4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -50,7 +50,7 @@ import java.util.Set;
  * is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
  * manage topics while producers rely on topic expiry to limit the refresh set.
  */
-public final class Metadata implements Closeable {
+public class Metadata implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(Metadata.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index e4ba197..b2098bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -595,6 +595,8 @@ public class NetworkClient implements KafkaClient {
     @Override
     public Node leastLoadedNode(long now) {
         List<Node> nodes = this.metadataUpdater.fetchNodes();
+        if (nodes.isEmpty())
+            throw new IllegalStateException("There are no nodes in the Kafka cluster");
         int inflight = Integer.MAX_VALUE;
         Node found = null;
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e249c12..316b024 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -252,7 +252,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
-    private final int requestTimeoutMs;
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
     private final TransactionManager transactionManager;
@@ -269,7 +268,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *
      */
     public KafkaProducer(final Map<String, Object> configs) {
-        this(new ProducerConfig(configs), null, null, null, null);
+        this(new ProducerConfig(configs), null, null, null, null, null, Time.SYSTEM);
     }
 
     /**
@@ -287,10 +286,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
-            keySerializer,
-            valueSerializer,
-            null,
-            null);
+            keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
     }
 
     /**
@@ -301,7 +297,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param properties   The producer configs
      */
     public KafkaProducer(Properties properties) {
-        this(new ProducerConfig(properties), null, null, null, null);
+        this(new ProducerConfig(properties), null, null, null, null, null, Time.SYSTEM);
     }
 
     /**
@@ -317,20 +313,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
-                keySerializer, valueSerializer, null, null);
+                keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
     }
 
-    @SuppressWarnings("unchecked")
     // visible for testing
+    @SuppressWarnings("unchecked")
     KafkaProducer(ProducerConfig config,
                   Serializer<K> keySerializer,
                   Serializer<V> valueSerializer,
                   Metadata metadata,
-                  KafkaClient kafkaClient) {
+                  KafkaClient kafkaClient,
+                  ProducerInterceptors interceptors,
+                  Time time) {
         try {
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
-            this.time = Time.SYSTEM;
+            this.time = time;
             String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
@@ -356,7 +354,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
-            ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
@@ -378,20 +375,21 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
             // load interceptors and make sure they get clientId
             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-            List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
-                    ProducerInterceptor.class);
-            this.interceptors = new ProducerInterceptors<>(interceptorList);
-            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
+            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
+            List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
+                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
+            if (interceptors != null)
+                this.interceptors = interceptors;
+            else
+                this.interceptors = new ProducerInterceptors<>(interceptorList);
+            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
+                    valueSerializer, interceptorList, reporters);
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
 
             this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.transactionManager = configureTransactionState(config, logContext, log);
-            int retries = configureRetries(config, transactionManager != null, log);
-            int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
-            short acks = configureAcks(config, transactionManager != null, log);
             int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
 
             this.apiVersions = new ApiVersions();
@@ -413,44 +411,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             } else {
                 this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                     true, true, clusterResourceListeners);
-                this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
+                this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
             }
-            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
-            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
-            KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
-                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                            this.metrics, time, "producer", channelBuilder, logContext),
-                    this.metadata,
-                    clientId,
-                    maxInflightRequests,
-                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
-                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
-                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
-                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
-                    this.requestTimeoutMs,
-                    time,
-                    true,
-                    apiVersions,
-                    throttleTimeSensor,
-                    logContext);
-            this.sender = new Sender(logContext,
-                    client,
-                    this.metadata,
-                    this.accumulator,
-                    maxInflightRequests == 1,
-                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
-                    acks,
-                    retries,
-                    metricsRegistry.senderMetrics,
-                    Time.SYSTEM,
-                    this.requestTimeoutMs,
-                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
-                    this.transactionManager,
-                    apiVersions);
+            this.errors = this.metrics.sensor("errors");
+            this.sender = newSender(logContext, kafkaClient, this.metadata);
             String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
-            this.errors = this.metrics.sensor("errors");
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
             log.debug("Kafka producer started");
@@ -462,6 +429,47 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
+    // visible for testing
+    Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
+        int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
+        int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig);
+        ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
+        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
+        KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
+                new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                        this.metrics, time, "producer", channelBuilder, logContext),
+                metadata,
+                clientId,
+                maxInflightRequests,
+                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
+                producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
+                producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
+                requestTimeoutMs,
+                time,
+                true,
+                apiVersions,
+                throttleTimeSensor,
+                logContext);
+        int retries = configureRetries(producerConfig, transactionManager != null, log);
+        short acks = configureAcks(producerConfig, transactionManager != null, log);
+        return new Sender(logContext,
+                client,
+                metadata,
+                this.accumulator,
+                maxInflightRequests == 1,
+                producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
+                acks,
+                retries,
+                metricsRegistry.senderMetrics,
+                time,
+                requestTimeoutMs,
+                producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
+                this.transactionManager,
+                apiVersions);
+    }
+
     private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
         int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
         int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index c5df2da..22d7472 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -135,7 +135,7 @@ public class BufferPool {
                         } finally {
                             long endWaitNs = time.nanoseconds();
                             timeNs = Math.max(0L, endWaitNs - startWaitNs);
-                            this.waitTime.record(timeNs, time.milliseconds());
+                            recordWaitTime(timeNs);
                         }
 
                         if (waitingTimeElapsed) {
@@ -185,6 +185,11 @@ public class BufferPool {
             return buffer;
     }
 
+    // Protected for testing
+    protected void recordWaitTime(long timeNs) {
+        this.waitTime.record(timeNs, time.milliseconds());
+    }
+
     /**
      * Allocate a buffer.  If buffer allocation fails (e.g. because of OOM) then return the size count back to
      * available memory and signal the next waiter if it exists.
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7640377..c50a85f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,8 +40,6 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -149,7 +147,7 @@ public class Sender implements Runnable {
         this.acks = acks;
         this.retries = retries;
         this.time = time;
-        this.sensors = new SenderMetrics(metricsRegistry);
+        this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time);
         this.requestTimeoutMs = requestTimeoutMs;
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
@@ -811,7 +809,7 @@ public class Sender implements Runnable {
     /**
      * A collection of sensors for the sender
      */
-    private class SenderMetrics {
+    private static class SenderMetrics {
         public final Sensor retrySensor;
         public final Sensor errorSensor;
         public final Sensor queueTimeSensor;
@@ -822,9 +820,11 @@ public class Sender implements Runnable {
         public final Sensor maxRecordSizeSensor;
         public final Sensor batchSplitSensor;
         private final SenderMetricsRegistry metrics;
+        private final Time time;
 
-        public SenderMetrics(SenderMetricsRegistry metrics) {
+        public SenderMetrics(SenderMetricsRegistry metrics, Metadata metadata, KafkaClient client, Time time) {
             this.metrics = metrics;
+            this.time = time;
 
             this.batchSizeSensor = metrics.sensor("batch-size");
             this.batchSizeSensor.add(metrics.batchSizeAvg, new Avg());
@@ -855,16 +855,9 @@ public class Sender implements Runnable {
             this.maxRecordSizeSensor.add(metrics.recordSizeMax, new Max());
             this.maxRecordSizeSensor.add(metrics.recordSizeAvg, new Avg());
 
-            this.metrics.addMetric(metrics.requestsInFlight, new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return client.inFlightRequestCount();
-                }
-            });
-            metrics.addMetric(metrics.metadataAge, new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
-                }
-            });
+            this.metrics.addMetric(metrics.requestsInFlight, (config, now) -> client.inFlightRequestCount());
+            this.metrics.addMetric(metrics.metadataAge,
+                (config, now) -> (now - metadata.lastSuccessfulUpdate()) / 1000.0);
 
             this.batchSplitSensor = metrics.sensor("batch-split-rate");
             this.batchSplitSensor.add(new Meter(metrics.batchSplitRate, metrics.batchSplitTotal));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2a3cbe0..4ac5876 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -74,7 +74,6 @@ import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -115,6 +114,11 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class KafkaConsumerTest {
     private final String topic = "test";
@@ -1846,18 +1850,16 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    @SuppressWarnings("deprecation")
     public void testCloseWithTimeUnit() {
-        KafkaConsumer consumer = EasyMock.partialMockBuilder(KafkaConsumer.class)
-                .addMockedMethod("close", Duration.class).createMock();
-        consumer.close(Duration.ofSeconds(1));
-        EasyMock.expectLastCall();
-        EasyMock.replay(consumer);
+        KafkaConsumer consumer = mock(KafkaConsumer.class);
+        doCallRealMethod().when(consumer).close(anyLong(), any());
         consumer.close(1, TimeUnit.SECONDS);
-        EasyMock.verify(consumer);
+        verify(consumer).close(Duration.ofSeconds(1));
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void testSubscriptionOnInvalidTopic() throws Exception {
+    public void testSubscriptionOnInvalidTopic() {
         Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster();
         Node node = cluster.nodes().get(0);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 4494fd5..8f6328d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -32,16 +32,19 @@ import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkClientTest {
 
@@ -148,69 +151,41 @@ public class ConsumerNetworkClientTest {
 
     @Test
     public void doNotBlockIfPollConditionIsSatisfied() {
-        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        NetworkClient mockNetworkClient = mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
                 mockNetworkClient, metadata, time, 100, 1000, Integer.MAX_VALUE);
 
         // expect poll, but with no timeout
-        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(0L), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
-
-        EasyMock.replay(mockNetworkClient);
-
-        consumerClient.poll(time.timer(Long.MAX_VALUE), new ConsumerNetworkClient.PollCondition() {
-            @Override
-            public boolean shouldBlock() {
-                return false;
-            }
-        });
-
-        EasyMock.verify(mockNetworkClient);
+        consumerClient.poll(time.timer(Long.MAX_VALUE), () -> false);
+        verify(mockNetworkClient).poll(eq(0L), anyLong());
     }
 
     @Test
     public void blockWhenPollConditionNotSatisfied() {
         long timeout = 4000L;
 
-        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        NetworkClient mockNetworkClient = mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
                 mockNetworkClient, metadata, time, 100, 1000, Integer.MAX_VALUE);
 
-        EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1);
-        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
-
-        EasyMock.replay(mockNetworkClient);
-
-        consumerClient.poll(time.timer(timeout), new ConsumerNetworkClient.PollCondition() {
-            @Override
-            public boolean shouldBlock() {
-                return true;
-            }
-        });
-
-        EasyMock.verify(mockNetworkClient);
+        when(mockNetworkClient.inFlightRequestCount()).thenReturn(1);
+        consumerClient.poll(time.timer(timeout), () -> true);
+        verify(mockNetworkClient).poll(eq(timeout), anyLong());
     }
 
     @Test
     public void blockOnlyForRetryBackoffIfNoInflightRequests() {
         long retryBackoffMs = 100L;
 
-        NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+        NetworkClient mockNetworkClient = mock(NetworkClient.class);
         ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
                 mockNetworkClient, metadata, time, retryBackoffMs, 1000, Integer.MAX_VALUE);
 
-        EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0);
-        EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+        when(mockNetworkClient.inFlightRequestCount()).thenReturn(0);
 
-        EasyMock.replay(mockNetworkClient);
-
-        consumerClient.poll(time.timer(Long.MAX_VALUE), new ConsumerNetworkClient.PollCondition() {
-            @Override
-            public boolean shouldBlock() {
-                return true;
-            }
-        });
+        consumerClient.poll(time.timer(Long.MAX_VALUE), () -> true);
 
-        EasyMock.verify(mockNetworkClient);
+        verify(mockNetworkClient).poll(eq(retryBackoffMs), anyLong());
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index dc6fe9f..77fcb51 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -20,9 +20,11 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
+import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -39,6 +41,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.MockMetricsReporter;
@@ -46,15 +49,8 @@ import org.apache.kafka.test.MockPartitioner;
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.support.membermodification.MemberModifier;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -66,14 +62,23 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
 public class KafkaProducerTest {
 
     @Test
@@ -209,7 +214,7 @@ public class KafkaProducerTest {
 
         final Producer<String, String> producer = new KafkaProducer<>(
             new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-            new StringSerializer(), new StringSerializer(), metadata, client);
+            new StringSerializer(), new StringSerializer(), metadata, client, null, time);
 
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> closeException = new AtomicReference<>();
@@ -271,17 +276,13 @@ public class KafkaProducerTest {
         new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testMetadataFetch() throws Exception {
+    public void testMetadataFetch() throws InterruptedException {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
-
+        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
+                new StringSerializer()));
         String topic = "topic";
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
         Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
         final Cluster emptyCluster = new Cluster(null, nodes,
                 Collections.emptySet(),
@@ -293,42 +294,48 @@ public class KafkaProducerTest {
                 Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
                 Collections.emptySet(),
                 Collections.emptySet());
+        Metadata metadata = mock(Metadata.class);
+
+        // Return empty cluster 4 times and cluster from then on
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster, cluster);
 
-        // Expect exactly one fetch for each attempt to refresh while topic metadata is not available
-        final int refreshAttempts = 5;
-        EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1);
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null,
+                metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
+                // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new Metadata(0, 100_000, true));
+            }
+        };
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
         producer.send(record);
-        PowerMock.verify(metadata);
 
-        // Expect exactly one fetch if topic metadata is available
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        // One request update for each empty cluster returned
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(5)).fetch();
+
+        // Should not request update for subsequent `send`
         producer.send(record, null);
-        PowerMock.verify(metadata);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(6)).fetch();
 
-        // Expect exactly one fetch if topic metadata is available
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        // Should not request update for subsequent `partitionsFor`
         producer.partitionsFor(topic);
-        PowerMock.verify(metadata);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(7)).fetch();
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testMetadataFetchOnStaleMetadata() throws Exception {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
-
+        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
+                new StringSerializer()));
         String topic = "topic";
         ProducerRecord<String, String> initialRecord = new ProducerRecord<>(topic, "value");
         // Create a record with a partition higher than the initial (outdated) partition range
@@ -353,134 +360,145 @@ public class KafkaProducerTest {
                         new PartitionInfo(topic, 2, null, null, null)),
                 Collections.emptySet(),
                 Collections.emptySet());
+        Metadata metadata = mock(Metadata.class);
+
+        AtomicInteger invocationCount = new AtomicInteger(0);
+
+        // Return empty cluster 4 times, initialCluster 5 times and extendedCluster after that
+        when(metadata.fetch()).then(invocation -> {
+            invocationCount.incrementAndGet();
+            if (invocationCount.get() > 9)
+                return extendedCluster;
+            else if (invocationCount.get() > 4)
+                return initialCluster;
+            return emptyCluster;
+        });
 
-        // Expect exactly one fetch for each attempt to refresh while topic metadata is not available
-        final int refreshAttempts = 5;
-        EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config, null, null,
+                metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
+                // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new Metadata(0, 100_000, true));
+            }
+        };
         producer.send(initialRecord);
-        PowerMock.verify(metadata);
 
-        // Expect exactly one fetch if topic metadata is available and records are still within range
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
-        producer.send(initialRecord, null);
-        PowerMock.verify(metadata);
+        // One request update for each empty cluster returned
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(5)).fetch();
+
+        // Should not request update if metadata is available and records are within range
+        producer.send(initialRecord);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(6)).fetch();
 
-        // Expect exactly two fetches if topic metadata is available but metadata response still returns
+        // One request update followed by exception if topic metadata is available but metadata response still returns
         // the same partition size (either because metadata are still stale at the broker too or because
-        // there weren't any partitions added in the first place).
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
+        // there weren't any partitions added in the first place)
         try {
-            producer.send(extendedRecord, null);
+            producer.send(extendedRecord);
             fail("Expected KafkaException to be raised");
         } catch (KafkaException e) {
             // expected
         }
-        PowerMock.verify(metadata);
-
-        // Expect exactly two fetches if topic metadata is available but outdated for the given record
-        PowerMock.reset(metadata);
-        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
-        EasyMock.expect(metadata.fetch()).andReturn(extendedCluster).once();
-        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
-        PowerMock.replay(metadata);
-        producer.send(extendedRecord, null);
-        PowerMock.verify(metadata);
+        verify(metadata, times(5)).requestUpdate();
+        verify(metadata, times(5)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(8)).fetch();
+
+        // One request update if metadata is available but outdated for the given record
+        producer.send(extendedRecord);
+        verify(metadata, times(6)).requestUpdate();
+        verify(metadata, times(6)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(10)).fetch();
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
-    public void testTopicRefreshInMetadata() throws Exception {
+    public void testTopicRefreshInMetadata() throws InterruptedException {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
+            new StringSerializer()));
         long refreshBackoffMs = 500L;
         long metadataExpireMs = 60000L;
         final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true,
                 true, new ClusterResourceListeners());
         final Time time = new MockTime();
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
-        MemberModifier.field(KafkaProducer.class, "time").set(producer, time);
         final String topic = "topic";
-
-        Thread t = new Thread(() -> {
-            long startTimeMs = System.currentTimeMillis();
-            for (int i = 0; i < 10; i++) {
-                while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000)
-                    Thread.yield();
-                metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
-                time.sleep(60 * 1000L);
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null, metadata,
+                null, null, time)) {
+
+            Thread t = new Thread(() -> {
+                long startTimeMs = System.currentTimeMillis();
+                for (int i = 0; i < 10; i++) {
+                    while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000)
+                        Thread.yield();
+                    metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
+                    time.sleep(60 * 1000L);
+                }
+            });
+            t.start();
+            try {
+                producer.partitionsFor(topic);
+                fail("Expect TimeoutException");
+            } catch (TimeoutException e) {
+                // skip
             }
-        });
-        t.start();
-        try {
-            producer.partitionsFor(topic);
-            fail("Expect TimeoutException");
-        } catch (TimeoutException e) {
-            // skip
+            t.join();
         }
-        Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
+        assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
     }
 
-    @SuppressWarnings("unchecked") // safe as generic parameters won't vary
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testHeadersWithExtendedClasses() throws Exception {
+    @Deprecated
+    public void testHeadersWithExtendedClasses() {
         doTestHeaders(ExtendedSerializer.class);
     }
 
-    @SuppressWarnings("unchecked")
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testHeaders() throws Exception {
+    public void testHeaders() {
         doTestHeaders(Serializer.class);
     }
 
-    private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) throws Exception {
+    private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        T keySerializer = PowerMock.createNiceMock(serializerClassToMock);
-        T valueSerializer = PowerMock.createNiceMock(serializerClassToMock);
-
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+        @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
+        Serializer<String> keySerializer = mock(serializerClassToMock);
+        @SuppressWarnings("unchecked")
+        Serializer<String> valueSerializer = mock(serializerClassToMock);
+        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, keySerializer,
+                valueSerializer));
 
         String topic = "topic";
         final Cluster cluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
                 Collections.emptySet(),
                 Collections.emptySet());
+        Metadata metadata = new Metadata(0, 90000, true);
+        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
 
+        KafkaProducer<String, String> producer = new KafkaProducer<>(config, keySerializer, valueSerializer,
+                metadata, null, null, Time.SYSTEM);
 
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).anyTimes();
-
-        PowerMock.replay(metadata);
+        when(keySerializer.serialize(any(), any(), any())).then(invocation ->
+                invocation.<String>getArgument(2).getBytes());
+        when(valueSerializer.serialize(any(), any(), any())).then(invocation ->
+                invocation.<String>getArgument(2).getBytes());
 
         String value = "value";
-
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
-        EasyMock.expect(keySerializer.serialize(topic, record.headers(), null)).andReturn(null).once();
-        EasyMock.expect(valueSerializer.serialize(topic, record.headers(), value)).andReturn(value.getBytes()).once();
-
-        PowerMock.replay(keySerializer);
-        PowerMock.replay(valueSerializer);
-
+        String key = "key";
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
 
         //ensure headers can be mutated pre send.
         record.headers().add(new RecordHeader("test", "header2".getBytes()));
-
         producer.send(record, null);
 
         //ensure headers are closed and cannot be mutated post send
@@ -492,11 +510,12 @@ public class KafkaProducerTest {
         }
 
         //ensure existing headers are not changed, and last header for key is still original value
-        assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes()));
+        assertArrayEquals(record.headers().lastHeader("test").value(), "header2".getBytes());
 
-        PowerMock.verify(valueSerializer);
-        PowerMock.verify(keySerializer);
+        verify(valueSerializer).serialize(topic, record.headers(), value);
+        verify(keySerializer).serialize(topic, record.headers(), key);
 
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -522,40 +541,38 @@ public class KafkaProducerTest {
         }
     }
 
-    @PrepareOnlyThisForTest(Metadata.class)
     @Test
-    public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception {
+    public void testInterceptorPartitionSetOnTooLargeRecord() {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
+        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(),
+                new StringSerializer()));
         String topic = "topic";
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
 
-        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(),
-                new StringSerializer());
-        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
-        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+        Metadata metadata = new Metadata(0, 90000, true);
         final Cluster cluster = new Cluster(
             "dummy",
             Collections.singletonList(new Node(0, "host1", 1000)),
-            Collections.singletonList(new PartitionInfo(topic, 0, null, null, null)),
+            Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
             Collections.emptySet(),
             Collections.emptySet());
-        EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
+        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
 
-        // Mock interceptors field
         @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
-        ProducerInterceptors<String, String> interceptors = PowerMock.createMock(ProducerInterceptors.class);
-        EasyMock.expect(interceptors.onSend(record)).andReturn(record);
-        interceptors.onSendError(EasyMock.eq(record), EasyMock.notNull(), EasyMock.notNull());
-        EasyMock.expectLastCall();
-        MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors);
-
-        PowerMock.replay(metadata);
-        EasyMock.replay(interceptors);
+                ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class);
+        KafkaProducer<String, String> producer = new KafkaProducer<>(config, null, null,
+                metadata, null, interceptors, Time.SYSTEM);
+
+        when(interceptors.onSend(any())).then(invocation -> invocation.getArgument(0));
+
         producer.send(record);
 
-        EasyMock.verify(interceptors);
+        verify(interceptors).onSend(record);
+        verify(interceptors).onSendError(eq(record), notNull(), notNull());
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -577,7 +594,7 @@ public class KafkaProducerTest {
         props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
-        Time time = new MockTime();
+        Time time = Time.SYSTEM;
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
         Node node = cluster.nodes().get(0);
 
@@ -589,7 +606,7 @@ public class KafkaProducerTest {
 
         try (Producer<String, String> producer = new KafkaProducer<>(
                 new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, client)) {
+                null, null, metadata, client, null, time)) {
             producer.initTransactions();
             fail("initTransactions() should have raised TimeoutException");
         }
@@ -614,7 +631,7 @@ public class KafkaProducerTest {
 
         Producer<String, String> producer = new KafkaProducer<>(
                 new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, client);
+                null, null, metadata, client, null, time);
         try {
             producer.initTransactions();
         } catch (TimeoutException e) {
@@ -630,7 +647,6 @@ public class KafkaProducerTest {
 
     @Test
     public void testSendToInvalidTopic() throws Exception {
-
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
         props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
@@ -647,9 +663,9 @@ public class KafkaProducerTest {
 
         Producer<String, String> producer = new KafkaProducer<>(new ProducerConfig(
                 ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, client);
+                null, null, metadata, client, null, time);
 
-        String invalidTopicName = "topic abc";          // Invalid topic name due to space
+        String invalidTopicName = "topic abc"; // Invalid topic name due to space
         ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
 
         Set<String> invalidTopic = new HashSet<>();
@@ -665,8 +681,11 @@ public class KafkaProducerTest {
 
         Future<RecordMetadata> future = producer.send(record);
 
-        assertEquals("Cluster has incorrect invalid topic list.", metaDataUpdateResponseCluster.invalidTopics(), metadata.fetch().invalidTopics());
+        assertEquals("Cluster has incorrect invalid topic list", metaDataUpdateResponseCluster.invalidTopics(),
+                metadata.fetch().invalidTopics());
         TestUtils.assertFutureError(future, InvalidTopicException.class);
+
+        producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -689,7 +708,7 @@ public class KafkaProducerTest {
 
         Producer<String, String> producer = new KafkaProducer<>(
                 new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, client);
+                new StringSerializer(), new StringSerializer(), metadata, client, null, time);
 
         ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicReference<Exception> sendException = new AtomicReference<>();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 23fc541..ce74eb1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -16,11 +16,8 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
@@ -36,25 +33,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.anyDouble;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.anyString;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-
-@RunWith(PowerMockRunner.class)
 public class BufferPoolTest {
     private final MockTime time = new MockTime();
     private final Metrics metrics = new Metrics(time);
@@ -241,36 +229,25 @@ public class BufferPoolTest {
         // both the allocate() called by threads t1 and t2 should have been interrupted and the waiters queue should be empty
         assertEquals(pool.queued(), 0);
     }
-
-    @PrepareForTest({Sensor.class, MetricName.class})
+    
     @Test
     public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception {
-        Metrics mockedMetrics = createNiceMock(Metrics.class);
-        Sensor mockedSensor = createNiceMock(Sensor.class);
-        MetricName metricName = createNiceMock(MetricName.class);
-        MetricName rateMetricName = createNiceMock(MetricName.class);
-        MetricName totalMetricName = createNiceMock(MetricName.class);
-
-        expect(mockedMetrics.sensor(BufferPool.WAIT_TIME_SENSOR_NAME)).andReturn(mockedSensor);
+        BufferPool bufferPool = spy(new BufferPool(2, 1, new Metrics(), time, metricGroup));
+        doThrow(new OutOfMemoryError()).when(bufferPool).recordWaitTime(anyLong());
 
-        mockedSensor.record(anyDouble(), anyLong());
-        expectLastCall().andThrow(new OutOfMemoryError());
-        expect(mockedMetrics.metricName(anyString(), eq(metricGroup), anyString())).andReturn(metricName);
-        expect(mockedSensor.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName))).andReturn(true);
-
-        replay(mockedMetrics, mockedSensor, metricName);
-
-        BufferPool bufferPool = new BufferPool(2, 1, mockedMetrics, time,  metricGroup);
         bufferPool.allocate(1, 0);
         try {
             bufferPool.allocate(2, 1000);
-            assertTrue("Expected oom.", false);
+            fail("Expected oom.");
         } catch (OutOfMemoryError expected) {
         }
         assertEquals(1, bufferPool.availableMemory());
         assertEquals(0, bufferPool.queued());
+        assertEquals(1, bufferPool.unallocatedMemory());
         //This shouldn't timeout
         bufferPool.allocate(1, 0);
+
+        verify(bufferPool).recordWaitTime(anyLong());
     }
 
     private static class BufferPoolAllocator implements Runnable {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index f93f343..f2a34f6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -84,24 +84,23 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 
-import static org.easymock.EasyMock.anyBoolean;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyLong;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.geq;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
+import static org.mockito.AdditionalMatchers.geq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.spy;
 
 public class SenderTest {
 
@@ -140,7 +139,8 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -2037,33 +2037,27 @@ public class SenderTest {
 
     @Test
     public void testResetNextBatchExpiry() throws Exception {
-        MockClient delegateClient = new MockClient(time);
-        client = mock(MockClient.class);
-        expect(client.ready(anyObject(), anyLong())).andDelegateTo(delegateClient).anyTimes();
-        expect(
-            client.newClientRequest(
-                anyString(), anyObject(), anyLong(), anyBoolean(), anyInt(), anyObject()))
-            .andDelegateTo(delegateClient).anyTimes();
-        client.send(anyObject(), anyLong());
-        expectLastCall().andDelegateTo(delegateClient).anyTimes();
-        expect(client.poll(eq(0L), anyLong())).andDelegateTo(delegateClient).times(1);
-        expect(client.poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong()))
-            .andDelegateTo(delegateClient)
-            .times(1);
-        expect(client.poll(geq(1L), anyLong())).andDelegateTo(delegateClient).times(1);
-        replay(client);
+        client = spy(new MockClient(time));
 
         setupWithTransactionState(null);
 
-        accumulator.append(
-            tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
+        accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null,
+                MAX_BLOCK_TIMEOUT);
 
         sender.run(time.milliseconds());
         sender.run(time.milliseconds());
         time.setCurrentTimeMs(time.milliseconds() + accumulator.getDeliveryTimeoutMs() + 1);
         sender.run(time.milliseconds());
 
-        verify(client);
+        InOrder inOrder = inOrder(client);
+        inOrder.verify(client, atLeastOnce()).ready(any(), anyLong());
+        inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), any(), anyLong(), anyBoolean(), anyInt(),
+                any());
+        inOrder.verify(client, atLeastOnce()).send(any(), anyLong());
+        inOrder.verify(client).poll(eq(0L), anyLong());
+        inOrder.verify(client).poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong());
+        inOrder.verify(client).poll(geq(1L), anyLong());
+
     }
 
     private class MatchingBufferPool extends BufferPool {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
index c2b89fe..27daf0f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -35,13 +34,14 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class ChannelBuildersTest {
 
     @Test
     public void testCreateOldPrincipalBuilder() throws Exception {
-        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
-        Authenticator authenticator = EasyMock.mock(Authenticator.class);
+        TransportLayer transportLayer = mock(TransportLayer.class);
+        Authenticator authenticator = mock(Authenticator.class);
 
         Map<String, Object> configs = new HashMap<>();
         configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, OldPrincipalBuilder.class);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index cef7c7f..6cf7586 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.IMocksControl;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,15 +52,17 @@ import java.util.Random;
 import java.util.Set;
 import java.util.Optional;
 
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -561,31 +562,20 @@ public class SelectorTest {
      */
     @Test
     public void testConnectDisconnectDuringInSinglePoll() throws Exception {
-        IMocksControl control = createControl();
-
         // channel is connected, not ready and it throws an exception during prepare
-        KafkaChannel kafkaChannel = control.createMock(KafkaChannel.class);
-        expect(kafkaChannel.id()).andStubReturn("1");
-        expect(kafkaChannel.socketDescription()).andStubReturn("");
-        expect(kafkaChannel.state()).andStubReturn(ChannelState.NOT_CONNECTED);
-        expect(kafkaChannel.finishConnect()).andReturn(true);
-        expect(kafkaChannel.isConnected()).andStubReturn(true);
-        // record void method invocations
-        kafkaChannel.disconnect();
-        kafkaChannel.close();
-        expect(kafkaChannel.ready()).andReturn(false).anyTimes();
-        // prepare throws an exception
-        kafkaChannel.prepare();
-        expectLastCall().andThrow(new IOException());
-
-        SelectionKey selectionKey = control.createMock(SelectionKey.class);
-        expect(kafkaChannel.selectionKey()).andStubReturn(selectionKey);
-        expect(selectionKey.channel()).andReturn(SocketChannel.open());
-        expect(selectionKey.readyOps()).andStubReturn(SelectionKey.OP_CONNECT);
-        selectionKey.cancel();
-        expectLastCall();
-
-        control.replay();
+        KafkaChannel kafkaChannel = mock(KafkaChannel.class);
+        when(kafkaChannel.id()).thenReturn("1");
+        when(kafkaChannel.socketDescription()).thenReturn("");
+        when(kafkaChannel.state()).thenReturn(ChannelState.NOT_CONNECTED);
+        when(kafkaChannel.finishConnect()).thenReturn(true);
+        when(kafkaChannel.isConnected()).thenReturn(true);
+        when(kafkaChannel.ready()).thenReturn(false);
+        doThrow(new IOException()).when(kafkaChannel).prepare();
+
+        SelectionKey selectionKey = mock(SelectionKey.class);
+        when(kafkaChannel.selectionKey()).thenReturn(selectionKey);
+        when(selectionKey.channel()).thenReturn(SocketChannel.open());
+        when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_CONNECT);
 
         selectionKey.attach(kafkaChannel);
         Set<SelectionKey> selectionKeys = Utils.mkSet(selectionKey);
@@ -595,7 +585,10 @@ public class SelectorTest {
         assertTrue(selector.disconnected().containsKey(kafkaChannel.id()));
         assertNull(selectionKey.attachment());
 
-        control.verify();
+        verify(kafkaChannel, atLeastOnce()).ready();
+        verify(kafkaChannel).disconnect();
+        verify(kafkaChannel).close();
+        verify(selectionKey).cancel();
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 77de33f..4b2b361 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -45,8 +43,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class FileRecordsTest extends EasyMockSupport {
+public class FileRecordsTest {
 
     private byte[][] values = new byte[][] {
             "abcd".getBytes(),
@@ -66,10 +70,7 @@ public class FileRecordsTest extends EasyMockSupport {
     public void testAppendProtectsFromOverflow() throws Exception {
         File fileMock = mock(File.class);
         FileChannel fileChannelMock = mock(FileChannel.class);
-        EasyMock.expect(fileChannelMock.size()).andStubReturn((long) Integer.MAX_VALUE);
-        EasyMock.expect(fileChannelMock.position(Integer.MAX_VALUE)).andReturn(fileChannelMock);
-
-        replayAll();
+        when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE);
 
         FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false);
         append(records, values);
@@ -79,9 +80,7 @@ public class FileRecordsTest extends EasyMockSupport {
     public void testOpenOversizeFile() throws Exception {
         File fileMock = mock(File.class);
         FileChannel fileChannelMock = mock(FileChannel.class);
-        EasyMock.expect(fileChannelMock.size()).andStubReturn(Integer.MAX_VALUE + 5L);
-
-        replayAll();
+        when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L);
 
         new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false);
     }
@@ -262,16 +261,16 @@ public class FileRecordsTest extends EasyMockSupport {
      */
     @Test
     public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
 
-        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
-        EasyMock.expect(channelMock.position(42L)).andReturn(null);
-        EasyMock.replay(channelMock);
+        when(channelMock.size()).thenReturn(42L);
+        when(channelMock.position(42L)).thenReturn(null);
 
         FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
         fileRecords.truncateTo(42);
 
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).size();
+        verify(channelMock, times(0)).truncate(anyLong());
     }
 
     /**
@@ -280,11 +279,9 @@ public class FileRecordsTest extends EasyMockSupport {
      */
     @Test
     public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
 
-        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
-        EasyMock.expect(channelMock.position(42L)).andReturn(null);
-        EasyMock.replay(channelMock);
+        when(channelMock.size()).thenReturn(42L);
 
         FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
 
@@ -295,7 +292,7 @@ public class FileRecordsTest extends EasyMockSupport {
             // expected
         }
 
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).size();
     }
 
     /**
@@ -303,17 +300,16 @@ public class FileRecordsTest extends EasyMockSupport {
      */
     @Test
     public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
 
-        EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
-        EasyMock.expect(channelMock.position(42L)).andReturn(null).once();
-        EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once();
-        EasyMock.replay(channelMock);
+        when(channelMock.size()).thenReturn(42L);
+        when(channelMock.truncate(anyLong())).thenReturn(channelMock);
 
         FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
         fileRecords.truncateTo(23);
 
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).size();
+        verify(channelMock).truncate(23);
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index 25bcd50..a05a850 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -20,11 +20,8 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
-import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
 import org.junit.Test;
 
 import javax.net.ssl.SSLSession;
@@ -33,8 +30,13 @@ import java.net.InetAddress;
 import java.security.Principal;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
+public class DefaultKafkaPrincipalBuilderTest {
 
     @Test
     @SuppressWarnings("deprecation")
@@ -43,12 +45,7 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         Authenticator authenticator = mock(Authenticator.class);
         PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class);
 
-        EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator))
-                .andReturn(new DummyPrincipal("foo"));
-        oldPrincipalBuilder.close();
-        EasyMock.expectLastCall();
-
-        replayAll();
+        when(oldPrincipalBuilder.buildPrincipal(any(), any())).thenReturn(new DummyPrincipal("foo"));
 
         DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
                 transportLayer, oldPrincipalBuilder, null);
@@ -59,15 +56,17 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(oldPrincipalBuilder).buildPrincipal(transportLayer, authenticator);
+        verify(oldPrincipalBuilder).close();
     }
 
     @Test
     public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
-        DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
-        assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
-                new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())));
-        builder.close();
+        try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null)) {
+            assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
+                    new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())));
+        }
     }
 
     @Test
@@ -78,12 +77,8 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class);
         SSLSession session = mock(SSLSession.class);
 
-        EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator))
-                .andReturn(new DummyPrincipal("foo"));
-        oldPrincipalBuilder.close();
-        EasyMock.expectLastCall();
-
-        replayAll();
+        when(oldPrincipalBuilder.buildPrincipal(any(), any()))
+                .thenReturn(new DummyPrincipal("foo"));
 
         DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
                 transportLayer, oldPrincipalBuilder, null);
@@ -94,16 +89,16 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(oldPrincipalBuilder).buildPrincipal(transportLayer, authenticator);
+        verify(oldPrincipalBuilder).close();
     }
 
     @Test
     public void testUseSessionPeerPrincipalForSsl() throws Exception {
         SSLSession session = mock(SSLSession.class);
 
-        EasyMock.expect(session.getPeerPrincipal()).andReturn(new DummyPrincipal("foo"));
-
-        replayAll();
+        when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo"));
 
         DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
 
@@ -113,17 +108,16 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(session, atLeastOnce()).getPeerPrincipal();
     }
 
     @Test
     public void testPrincipalBuilderScram() throws Exception {
         SaslServer server = mock(SaslServer.class);
 
-        EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
-        EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
-
-        replayAll();
+        when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
+        when(server.getAuthorizationID()).thenReturn("foo");
 
         DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null);
 
@@ -133,7 +127,9 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(server, atLeastOnce()).getMechanismName();
+        verify(server, atLeastOnce()).getAuthorizationID();
     }
 
     @Test
@@ -141,12 +137,9 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         SaslServer server = mock(SaslServer.class);
         KerberosShortNamer kerberosShortNamer = mock(KerberosShortNamer.class);
 
-        EasyMock.expect(server.getMechanismName()).andReturn(SaslConfigs.GSSAPI_MECHANISM);
-        EasyMock.expect(server.getAuthorizationID()).andReturn("foo/host@REALM.COM");
-        EasyMock.expect(kerberosShortNamer.shortName(EasyMock.anyObject(KerberosName.class)))
-                .andReturn("foo");
-
-        replayAll();
+        when(server.getMechanismName()).thenReturn(SaslConfigs.GSSAPI_MECHANISM);
+        when(server.getAuthorizationID()).thenReturn("foo/host@REALM.COM");
+        when(kerberosShortNamer.shortName(any())).thenReturn("foo");
 
         DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);
 
@@ -156,7 +149,10 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
         assertEquals("foo", principal.getName());
 
         builder.close();
-        verifyAll();
+
+        verify(server, atLeastOnce()).getMechanismName();
+        verify(server, atLeastOnce()).getAuthorizationID();
+        verify(kerberosShortNamer, atLeastOnce()).shortName(any());
     }
 
     private static class DummyPrincipal implements Principal {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 1ae83ee..d62261a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -28,9 +28,6 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.junit.Test;
 
 import javax.security.auth.Subject;
@@ -42,33 +39,32 @@ import java.util.Map;
 
 import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class SaslServerAuthenticatorTest {
 
     @Test(expected = InvalidReceiveException.class)
     public void testOversizeRequest() throws IOException {
-        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
+        TransportLayer transportLayer = mock(TransportLayer.class);
         Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
         SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
 
-        final Capture<ByteBuffer> size = EasyMock.newCapture();
-        EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                size.getValue().putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
-                return 4;
-            }
+        when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
+            invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
+            return 4;
         });
-
-        EasyMock.replay(transportLayer);
-
         authenticator.authenticate();
+        verify(transportLayer).read(any(ByteBuffer.class));
     }
 
     @Test
     public void testUnexpectedRequestType() throws IOException {
-        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
+        TransportLayer transportLayer = mock(TransportLayer.class);
         Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
         SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
@@ -76,33 +72,23 @@ public class SaslServerAuthenticatorTest {
         final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243);
         final Struct headerStruct = header.toStruct();
 
-        final Capture<ByteBuffer> size = EasyMock.newCapture();
-        EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                size.getValue().putInt(headerStruct.sizeOf());
-                return 4;
-            }
-        });
-
-        final Capture<ByteBuffer> payload = EasyMock.newCapture();
-        EasyMock.expect(transportLayer.read(EasyMock.capture(payload))).andAnswer(new IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                // serialize only the request header. the authenticator should not parse beyond this
-                headerStruct.writeTo(payload.getValue());
-                return headerStruct.sizeOf();
-            }
+        when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
+            invocation.<ByteBuffer>getArgument(0).putInt(headerStruct.sizeOf());
+            return 4;
+        }).then(invocation -> {
+            // serialize only the request header. the authenticator should not parse beyond this
+            headerStruct.writeTo(invocation.getArgument(0));
+            return headerStruct.sizeOf();
         });
 
-        EasyMock.replay(transportLayer);
-
         try {
             authenticator.authenticate();
             fail("Expected authenticate() to raise an exception");
         } catch (IllegalSaslStateException e) {
             // expected exception
         }
+
+        verify(transportLayer, times(2)).read(any(ByteBuffer.class));
     }
 
     private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer, String mechanism) throws IOException {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
index 51d6012..2ecde99 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,7 +42,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
 import org.apache.kafka.common.security.auth.SaslExtensions;
-import org.easymock.EasyMock;
 import org.junit.Test;
 
 public class OAuthBearerLoginModuleTest {
@@ -124,12 +125,10 @@ public class OAuthBearerLoginModuleTest {
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class), EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[2]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class),
+            mock(SaslExtensions.class), mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions);
 
         // Create login modules
@@ -207,6 +206,9 @@ public class OAuthBearerLoginModuleTest {
         assertEquals(1, publicCredentials.size());
         assertSame(tokens[2], privateCredentials.iterator().next());
         assertSame(extensions[2], publicCredentials.iterator().next());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     @Test
@@ -220,12 +222,10 @@ public class OAuthBearerLoginModuleTest {
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[1]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class),
+            mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions);
 
         // Create login modules
@@ -268,6 +268,9 @@ public class OAuthBearerLoginModuleTest {
         // Should have nothing again
         assertEquals(0, privateCredentials.size());
         assertEquals(0, publicCredentials.size());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     @Test
@@ -280,12 +283,10 @@ public class OAuthBearerLoginModuleTest {
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[1]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class),
+            mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions);
 
         // Create login module
@@ -319,6 +320,9 @@ public class OAuthBearerLoginModuleTest {
         // Should have nothing again
         assertEquals(0, privateCredentials.size());
         assertEquals(0, publicCredentials.size());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     @Test
@@ -332,12 +336,10 @@ public class OAuthBearerLoginModuleTest {
         Set<Object> publicCredentials = subject.getPublicCredentials();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
-            EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)};
-        SaslExtensions[] extensions = new SaslExtensions[] {EasyMock.mock(SaslExtensions.class),
-            EasyMock.mock(SaslExtensions.class), EasyMock.mock(SaslExtensions.class)};
-        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
-        EasyMock.replay(extensions[0], extensions[1], extensions[2]);
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class),
+            mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)};
+        SaslExtensions[] extensions = new SaslExtensions[] {mock(SaslExtensions.class),
+            mock(SaslExtensions.class), mock(SaslExtensions.class)};
         TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, extensions);
 
         // Create login modules
@@ -402,6 +404,9 @@ public class OAuthBearerLoginModuleTest {
         assertSame(tokens[2], privateCredentials.iterator().next());
         assertEquals(1, publicCredentials.size());
         assertSame(extensions[2], publicCredentials.iterator().next());
+
+        verifyZeroInteractions((Object[]) tokens);
+        verifyZeroInteractions((Object[]) extensions);
     }
 
     /**
@@ -413,9 +418,8 @@ public class OAuthBearerLoginModuleTest {
         Subject subject = new Subject();
 
         // Create callback handler
-        OAuthBearerToken[] tokens = new OAuthBearerToken[] {EasyMock.mock(OAuthBearerToken.class),
-                EasyMock.mock(OAuthBearerToken.class), EasyMock.mock(OAuthBearerToken.class)};
-        EasyMock.replay(tokens[0], tokens[1], tokens[2]); // expect nothing
+        OAuthBearerToken[] tokens = new OAuthBearerToken[] {mock(OAuthBearerToken.class),
+                mock(OAuthBearerToken.class), mock(OAuthBearerToken.class)};
         TestCallbackHandler testTokenCallbackHandler = new TestCallbackHandler(tokens, new SaslExtensions[] {RAISE_UNSUPPORTED_CB_EXCEPTION_FLAG});
 
         // Create login modules
@@ -429,5 +433,7 @@ public class OAuthBearerLoginModuleTest {
         SaslExtensions extensions = subject.getPublicCredentials(SaslExtensions.class).iterator().next();
         assertNotNull(extensions);
         assertTrue(extensions.map().isEmpty());
+
+        verifyZeroInteractions((Object[]) tokens);
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
index fad7431..6d23f62 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.SaslExtensions;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
 import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
-import org.easymock.EasyMockSupport;
 import org.junit.Test;
 
 import javax.security.auth.callback.Callback;
@@ -39,7 +38,7 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class OAuthBearerSaslClientTest extends EasyMockSupport {
+public class OAuthBearerSaslClientTest {
 
     private static final Map<String, String> TEST_PROPERTIES = new LinkedHashMap<String, String>() {
         {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
index 9c62bef..cc0b983 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLoginTest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.common.security.oauthbearer.internals.expiring;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -42,8 +45,8 @@ import org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringC
 import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.easymock.EasyMock;
 import org.junit.Test;
+import org.mockito.InOrder;
 
 public class ExpiringCredentialRefreshingLoginTest {
     private static final Configuration EMPTY_WILDCARD_CONFIGURATION;
@@ -257,24 +260,8 @@ public class ExpiringCredentialRefreshingLoginTest {
         for (int numExpectedRefreshes : new int[] {0, 1, 2}) {
             for (boolean clientReloginAllowedBeforeLogout : new boolean[] {true, false}) {
                 Subject subject = new Subject();
-                /*
-                 * Create a mock and record the fact that we expect login() to be invoked
-                 * followed by getSubject() and then ultimately followed by numExpectedRefreshes
-                 * pairs of either login()/logout() or logout()/login() calls
-                 */
-                final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
-                mockLoginContext.login();
-                EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-                for (int i = 0; i < numExpectedRefreshes; ++i) {
-                    if (clientReloginAllowedBeforeLogout) {
-                        mockLoginContext.login();
-                        mockLoginContext.logout();
-                    } else {
-                        mockLoginContext.logout();
-                        mockLoginContext.login();
-                    }
-                }
-                EasyMock.replay(mockLoginContext);
+                final LoginContext mockLoginContext = mock(LoginContext.class);
+                when(mockLoginContext.getSubject()).thenReturn(subject);
 
                 MockTime mockTime = new MockTime();
                 long startMs = mockTime.milliseconds();
@@ -335,6 +322,23 @@ public class ExpiringCredentialRefreshingLoginTest {
                     assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, waiter.get().longValue() - startMs);
                 }
                 assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+                /*
+                 * We expect login() to be invoked followed by getSubject() and then ultimately followed by
+                 * numExpectedRefreshes pairs of either login()/logout() or logout()/login() calls
+                 */
+                InOrder inOrder = inOrder(mockLoginContext);
+                inOrder.verify(mockLoginContext).login();
+                inOrder.verify(mockLoginContext).getSubject();
+                for (int i = 0; i < numExpectedRefreshes; ++i) {
+                    if (clientReloginAllowedBeforeLogout) {
+                        inOrder.verify(mockLoginContext).login();
+                        inOrder.verify(mockLoginContext).logout();
+                    } else {
+                        inOrder.verify(mockLoginContext).logout();
+                        inOrder.verify(mockLoginContext).login();
+                    }
+                }
             }
         }
     }
@@ -343,20 +347,9 @@ public class ExpiringCredentialRefreshingLoginTest {
     public void testRefreshWithExpirationSmallerThanConfiguredBuffers() throws Exception {
         int numExpectedRefreshes = 1;
         boolean clientReloginAllowedBeforeLogout = true;
+        final LoginContext mockLoginContext = mock(LoginContext.class);
         Subject subject = new Subject();
-        /*
-         * Create a mock and record the fact that we expect login() to be invoked
-         * followed by getSubject() and then ultimately followed by numExpectedRefreshes
-         * pairs of login()/logout() calls
-         */
-        final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
-        mockLoginContext.login();
-        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-        for (int i = 0; i < numExpectedRefreshes; ++i) {
-            mockLoginContext.login();
-            mockLoginContext.logout();
-        }
-        EasyMock.replay(mockLoginContext);
+        when(mockLoginContext.getSubject()).thenReturn(subject);
 
         MockTime mockTime = new MockTime();
         long startMs = mockTime.milliseconds();
@@ -419,6 +412,13 @@ public class ExpiringCredentialRefreshingLoginTest {
             assertEquals((i + 1) * 1000 * 60 * refreshEveryMinutes, waiter.get().longValue() - startMs);
         }
         assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+        InOrder inOrder = inOrder(mockLoginContext);
+        inOrder.verify(mockLoginContext).login();
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            inOrder.verify(mockLoginContext).login();
+            inOrder.verify(mockLoginContext).logout();
+        }
     }
 
     @Test
@@ -426,19 +426,8 @@ public class ExpiringCredentialRefreshingLoginTest {
         int numExpectedRefreshes = 1;
         boolean clientReloginAllowedBeforeLogout = true;
         Subject subject = new Subject();
-        /*
-         * Create a mock and record the fact that we expect login() to be invoked
-         * followed by getSubject() and then ultimately followed by numExpectedRefreshes
-         * pairs of login()/logout() calls
-         */
-        final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
-        mockLoginContext.login();
-        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-        for (int i = 0; i < numExpectedRefreshes; ++i) {
-            mockLoginContext.login();
-            mockLoginContext.logout();
-        }
-        EasyMock.replay(mockLoginContext);
+        final LoginContext mockLoginContext = mock(LoginContext.class);
+        when(mockLoginContext.getSubject()).thenReturn(subject);
 
         MockTime mockTime = new MockTime();
         long startMs = mockTime.milliseconds();
@@ -504,6 +493,13 @@ public class ExpiringCredentialRefreshingLoginTest {
                     waiter.get().longValue() - startMs);
         }
         assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+        InOrder inOrder = inOrder(mockLoginContext);
+        inOrder.verify(mockLoginContext).login();
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            inOrder.verify(mockLoginContext).login();
+            inOrder.verify(mockLoginContext).logout();
+        }
     }
 
     @Test
@@ -511,19 +507,8 @@ public class ExpiringCredentialRefreshingLoginTest {
         int numExpectedRefreshes = 1;
         boolean clientReloginAllowedBeforeLogout = true;
         Subject subject = new Subject();
-        /*
-         * Create a mock and record the fact that we expect login() to be invoked
-         * followed by getSubject() and then ultimately followed by numExpectedRefreshes
-         * pairs of login()/logout() calls
-         */
-        final LoginContext mockLoginContext = EasyMock.strictMock(LoginContext.class);
-        mockLoginContext.login();
-        EasyMock.expect(mockLoginContext.getSubject()).andReturn(subject);
-        for (int i = 0; i < numExpectedRefreshes; ++i) {
-            mockLoginContext.login();
-            mockLoginContext.logout();
-        }
-        EasyMock.replay(mockLoginContext);
+        final LoginContext mockLoginContext = mock(LoginContext.class);
+        when(mockLoginContext.getSubject()).thenReturn(subject);
 
         MockTime mockTime = new MockTime();
         long startMs = mockTime.milliseconds();
@@ -588,6 +573,13 @@ public class ExpiringCredentialRefreshingLoginTest {
                     waiter.get().longValue() - startMs);
         }
         assertFalse(waiters.get(numExpectedRefreshes).isDone());
+
+        InOrder inOrder = inOrder(mockLoginContext);
+        inOrder.verify(mockLoginContext).login();
+        for (int i = 0; i < numExpectedRefreshes; ++i) {
+            inOrder.verify(mockLoginContext).login();
+            inOrder.verify(mockLoginContext).logout();
+        }
     }
 
     private static List<KafkaFutureImpl<Long>> addWaiters(MockScheduler mockScheduler, long refreshEveryMillis,
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index b258a34..d5029b6 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -17,9 +17,8 @@
 package org.apache.kafka.common.utils;
 
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.junit.Test;
+import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.Closeable;
 import java.io.DataOutputStream;
@@ -34,6 +33,8 @@ import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.apache.kafka.common.utils.Utils.formatBytes;
@@ -45,6 +46,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class UtilsTest {
 
@@ -324,17 +331,15 @@ public class UtilsTest {
      */
     @Test
     public void testReadFullyOrFailWithPartialFileChannelReads() throws IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
         final int bufferSize = 100;
         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-        StringBuilder expectedBufferContent = new StringBuilder();
-        fileChannelMockExpectReadWithRandomBytes(channelMock, expectedBufferContent, bufferSize);
-        EasyMock.replay(channelMock);
+        String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
         Utils.readFullyOrFail(channelMock, buffer, 0L, "test");
-        assertEquals("The buffer should be populated correctly", expectedBufferContent.toString(),
+        assertEquals("The buffer should be populated correctly", expectedBufferContent,
                 new String(buffer.array()));
         assertFalse("The buffer should be filled", buffer.hasRemaining());
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
 
     /**
@@ -343,73 +348,62 @@ public class UtilsTest {
      */
     @Test
     public void testReadFullyWithPartialFileChannelReads() throws IOException {
-        FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        FileChannel channelMock = mock(FileChannel.class);
         final int bufferSize = 100;
-        StringBuilder expectedBufferContent = new StringBuilder();
-        fileChannelMockExpectReadWithRandomBytes(channelMock, expectedBufferContent, bufferSize);
-        EasyMock.replay(channelMock);
+        String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
         Utils.readFully(channelMock, buffer, 0L);
-        assertEquals("The buffer should be populated correctly.", expectedBufferContent.toString(),
+        assertEquals("The buffer should be populated correctly.", expectedBufferContent,
                 new String(buffer.array()));
         assertFalse("The buffer should be filled", buffer.hasRemaining());
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
 
     @Test
     public void testReadFullyIfEofIsReached() throws IOException {
-        final FileChannel channelMock = EasyMock.createMock(FileChannel.class);
+        final FileChannel channelMock = mock(FileChannel.class);
         final int bufferSize = 100;
         final String fileChannelContent = "abcdefghkl";
         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-        EasyMock.expect(channelMock.size()).andReturn((long) fileChannelContent.length());
-        EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
-            @Override
-            public Integer answer() {
-                ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
-                buffer.put(fileChannelContent.getBytes());
-                return -1;
-            }
+        when(channelMock.read(any(), anyLong())).then(invocation -> {
+            ByteBuffer bufferArg = invocation.getArgument(0);
+            bufferArg.put(fileChannelContent.getBytes());
+            return -1;
         });
-        EasyMock.replay(channelMock);
         Utils.readFully(channelMock, buffer, 0L);
         assertEquals("abcdefghkl", new String(buffer.array(), 0, buffer.position()));
-        assertEquals(buffer.position(), channelMock.size());
+        assertEquals(fileChannelContent.length(), buffer.position());
         assertTrue(buffer.hasRemaining());
-        EasyMock.verify(channelMock);
+        verify(channelMock, atLeastOnce()).read(any(), anyLong());
     }
 
     /**
      * Expectation setter for multiple reads where each one reads random bytes to the buffer.
      *
      * @param channelMock           The mocked FileChannel object
-     * @param expectedBufferContent buffer that will be updated to contain the expected buffer content after each
-     *                              `FileChannel.read` invocation
      * @param bufferSize            The buffer size
+     * @return                      Expected buffer string
      * @throws IOException          If an I/O error occurs
      */
-    private void fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock,
-                                                          final StringBuilder expectedBufferContent,
-                                                          final int bufferSize) throws IOException {
+    private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock,
+                                                            final int bufferSize) throws IOException {
         final int step = 20;
         final Random random = new Random();
         int remainingBytes = bufferSize;
+        OngoingStubbing<Integer> when = when(channelMock.read(any(), anyLong()));
+        StringBuilder expectedBufferContent = new StringBuilder();
         while (remainingBytes > 0) {
-            final int mockedBytesRead = remainingBytes < step ? remainingBytes : random.nextInt(step);
-            final StringBuilder sb = new StringBuilder();
-            EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
-                @Override
-                public Integer answer() {
-                    ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
-                    for (int i = 0; i < mockedBytesRead; i++)
-                        sb.append("a");
-                    buffer.put(sb.toString().getBytes());
-                    expectedBufferContent.append(sb);
-                    return mockedBytesRead;
-                }
+            final int bytesRead = remainingBytes < step ? remainingBytes : random.nextInt(step);
+            final String stringRead = IntStream.range(0, bytesRead).mapToObj(i -> "a").collect(Collectors.joining());
+            expectedBufferContent.append(stringRead);
+            when = when.then(invocation -> {
+                ByteBuffer buffer = invocation.getArgument(0);
+                buffer.put(stringRead.getBytes());
+                return bytesRead;
             });
-            remainingBytes -= mockedBytesRead;
+            remainingBytes -= bytesRead;
         }
+        return expectedBufferContent.toString();
     }
 
     private static class TestCloseable implements Closeable {
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 18a2534..e22885e 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -74,6 +74,7 @@ versions += [
   lz4: "1.5.0",
   mavenArtifact: "3.5.4",
   metrics: "2.2.0",
+  mockito: "2.23.0",
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
   powermock: "2.0.0-beta.5",
   reflections: "0.9.11",
@@ -126,6 +127,7 @@ libs += [
   log4j: "log4j:log4j:$versions.log4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
+  mockitoCore: "org.mockito:mockito-core:$versions.mockito",
   powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
   powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock",
   reflections: "org.reflections:reflections:$versions.reflections",


Mime
View raw message