kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.8 updated: JUnit extensions for integration tests (#9986)
Date Wed, 10 Feb 2021 20:57:02 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new c3372c5  JUnit extensions for integration tests (#9986)
c3372c5 is described below

commit c3372c5a55f9e70c81f5fc9cc8d9c10ba1f17066
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Tue Feb 9 11:49:33 2021 -0500

    JUnit extensions for integration tests (#9986)
    
    Adds JUnit 5 extension for running the same test with different types of clusters.
    See core/src/test/java/kafka/test/junit/README.md for details
---
 build.gradle                                       |   5 +-
 checkstyle/import-control-core.xml                 |  15 ++
 checkstyle/suppressions.xml                        |   1 +
 core/src/test/java/kafka/test/ClusterConfig.java   | 207 +++++++++++++++++
 .../src/test/java/kafka/test/ClusterGenerator.java |  25 ++
 core/src/test/java/kafka/test/ClusterInstance.java |  97 ++++++++
 .../java/kafka/test/ClusterTestExtensionsTest.java | 112 +++++++++
 .../test/java/kafka/test/annotation/AutoStart.java |  24 ++
 .../test/annotation/ClusterConfigProperty.java     |  32 +++
 .../kafka/test/annotation/ClusterTemplate.java     |  55 +++++
 .../java/kafka/test/annotation/ClusterTest.java    |  44 ++++
 .../kafka/test/annotation/ClusterTestDefaults.java |  42 ++++
 .../java/kafka/test/annotation/ClusterTests.java   |  35 +++
 core/src/test/java/kafka/test/annotation/Type.java |  28 +++
 .../junit/ClusterInstanceParameterResolver.java    |  68 ++++++
 .../kafka/test/junit/ClusterTestExtensions.java    | 220 ++++++++++++++++++
 .../kafka/test/junit/GenericParameterResolver.java |  51 +++++
 core/src/test/java/kafka/test/junit/README.md      | 139 ++++++++++++
 .../test/junit/ZkClusterInvocationContext.java     | 252 +++++++++++++++++++++
 .../scala/integration/kafka/api/SaslSetup.scala    |   2 +-
 .../kafka/server/IntegrationTestUtils.scala        | 115 ++++++++++
 .../server/AbstractApiVersionsRequestTest.scala    |  23 +-
 .../unit/kafka/server/ApiVersionsRequestTest.scala |  45 ++--
 .../kafka/server/ClientQuotasRequestTest.scala     |  95 ++++----
 .../kafka/server/SaslApiVersionsRequestTest.scala  |  69 +++---
 25 files changed, 1690 insertions(+), 111 deletions(-)

diff --git a/build.gradle b/build.gradle
index 602645d..790cd11 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1013,7 +1013,10 @@ project(':core') {
     }
     test {
       java {
-        srcDirs = ["src/generated/java", "src/test/java"]
+        srcDirs = []
+      }
+      scala {
+        srcDirs = ["src/test/java", "src/test/scala"]
       }
     }
   }
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 6e5042f..e9653ba 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -58,4 +58,19 @@
     <allow pkg="org.apache.kafka.clients" />
   </subpackage>
 
+  <subpackage name="test">
+    <allow pkg="kafka.test.annotation"/>
+    <allow pkg="kafka.test.junit"/>
+    <allow pkg="kafka.network"/>
+    <allow pkg="kafka.api"/>
+    <allow pkg="kafka.server"/>
+    <allow pkg="org.apache.kafka.clients.admin"/>
+    <allow pkg="integration.kafka.server" class="IntegrationTestHelper"/>
+    <subpackage name="annotation">
+      <allow pkg="kafka.test"/>
+    </subpackage>
+    <subpackage name="junit">
+      <allow pkg="kafka.test"/>
+    </subpackage>
+  </subpackage>
 </import-control>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 321a171..76690bb 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -23,6 +23,7 @@
     <!-- core -->
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
               files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
+    <suppress checks="NPathComplexity" files="ClusterTestExtensions.java"/>
 
     <!-- Clients -->
     <suppress id="dontUseSystemExit"
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
new file mode 100644
index 0000000..db5f14e
--- /dev/null
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Represents a requested configuration of a Kafka cluster for integration testing
+ */
+public class ClusterConfig {
+
+    private final Type type;
+    private final int brokers;
+    private final int controllers;
+    private final String name;
+    private final boolean autoStart;
+
+    private final SecurityProtocol securityProtocol;
+    private final String listenerName;
+    private final File trustStoreFile;
+
+    private final Properties serverProperties = new Properties();
+    private final Properties producerProperties = new Properties();
+    private final Properties consumerProperties = new Properties();
+    private final Properties adminClientProperties = new Properties();
+    private final Properties saslServerProperties = new Properties();
+    private final Properties saslClientProperties = new Properties();
+
+    ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
+                  SecurityProtocol securityProtocol, String listenerName, File trustStoreFile) {
+        this.type = type;
+        this.brokers = brokers;
+        this.controllers = controllers;
+        this.name = name;
+        this.autoStart = autoStart;
+        this.securityProtocol = securityProtocol;
+        this.listenerName = listenerName;
+        this.trustStoreFile = trustStoreFile;
+    }
+
+    public Type clusterType() {
+        return type;
+    }
+
+    public int numBrokers() {
+        return brokers;
+    }
+
+    public int numControllers() {
+        return controllers;
+    }
+
+    public Optional<String> name() {
+        return Optional.ofNullable(name);
+    }
+
+    public boolean isAutoStart() {
+        return autoStart;
+    }
+
+    public Properties serverProperties() {
+        return serverProperties;
+    }
+
+    public Properties producerProperties() {
+        return producerProperties;
+    }
+
+    public Properties consumerProperties() {
+        return consumerProperties;
+    }
+
+    public Properties adminClientProperties() {
+        return adminClientProperties;
+    }
+
+    public Properties saslServerProperties() {
+        return saslServerProperties;
+    }
+
+    public Properties saslClientProperties() {
+        return saslClientProperties;
+    }
+
+    public SecurityProtocol securityProtocol() {
+        return securityProtocol;
+    }
+
+    public Optional<String> listenerName() {
+        return Optional.ofNullable(listenerName);
+    }
+
+    public Optional<File> trustStoreFile() {
+        return Optional.ofNullable(trustStoreFile);
+    }
+
+    public Map<String, String> nameTags() {
+        Map<String, String> tags = new LinkedHashMap<>(3);
+        name().ifPresent(name -> tags.put("Name", name));
+        tags.put("security", securityProtocol.name());
+        listenerName().ifPresent(listener -> tags.put("listener", listener));
+        return tags;
+    }
+
+    public ClusterConfig copyOf() {
+        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile);
+        copy.serverProperties.putAll(serverProperties);
+        copy.producerProperties.putAll(producerProperties);
+        copy.consumerProperties.putAll(consumerProperties);
+        copy.saslServerProperties.putAll(saslServerProperties);
+        copy.saslClientProperties.putAll(saslClientProperties);
+        return copy;
+    }
+
+    public static Builder defaultClusterBuilder() {
+        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT);
+    }
+
+    public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
+        return new Builder(type, brokers, controllers, autoStart, securityProtocol);
+    }
+
+    public static class Builder {
+        private Type type;
+        private int brokers;
+        private int controllers;
+        private String name;
+        private boolean autoStart;
+        private SecurityProtocol securityProtocol;
+        private String listenerName;
+        private File trustStoreFile;
+
+        Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
+            this.type = type;
+            this.brokers = brokers;
+            this.controllers = controllers;
+            this.autoStart = autoStart;
+            this.securityProtocol = securityProtocol;
+        }
+
+        public Builder type(Type type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder brokers(int brokers) {
+            this.brokers = brokers;
+            return this;
+        }
+
+        public Builder controllers(int controllers) {
+            this.controllers = controllers;
+            return this;
+        }
+
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public Builder autoStart(boolean autoStart) {
+            this.autoStart = autoStart;
+            return this;
+        }
+
+        public Builder securityProtocol(SecurityProtocol securityProtocol) {
+            this.securityProtocol = securityProtocol;
+            return this;
+        }
+
+        public Builder listenerName(String listenerName) {
+            this.listenerName = listenerName;
+            return this;
+        }
+
+        public Builder trustStoreFile(File trustStoreFile) {
+            this.trustStoreFile = trustStoreFile;
+            return this;
+        }
+
+        public ClusterConfig build() {
+            return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile);
+        }
+    }
+}
diff --git a/core/src/test/java/kafka/test/ClusterGenerator.java b/core/src/test/java/kafka/test/ClusterGenerator.java
new file mode 100644
index 0000000..97a2463
--- /dev/null
+++ b/core/src/test/java/kafka/test/ClusterGenerator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import java.util.function.Consumer;
+
+@FunctionalInterface
+public interface ClusterGenerator extends Consumer<ClusterConfig> {
+
+}
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
new file mode 100644
index 0000000..8732aa9
--- /dev/null
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.network.SocketServer;
+import kafka.test.annotation.ClusterTest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.util.Collection;
+import java.util.Properties;
+
+public interface ClusterInstance {
+
+    enum ClusterType {
+        ZK,
+        // RAFT
+    }
+
+    /**
+     * Cluster type. For now, only ZK is supported.
+     */
+    ClusterType clusterType();
+
+    /**
+     * The cluster configuration used to create this cluster. Changing data in this instance through this accessor will
+     * have no affect on the cluster since it is already provisioned.
+     */
+    ClusterConfig config();
+
+    /**
+     * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
+     * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT
+     */
+    ListenerName clientListener();
+
+    /**
+     * The broker connect string which can be used by clients for bootstrapping
+     */
+    String bootstrapServers();
+
+    /**
+     * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
+     * acting as the controller (since ZK controllers serve both broker and controller roles).
+     */
+    Collection<SocketServer> brokerSocketServers();
+
+    /**
+     * A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also
+     * currently the active controller. For Raft-based clusters, this will return all controller servers.
+     */
+    Collection<SocketServer> controllerSocketServers();
+
+    /**
+     * Return any one of the broker servers. Throw an error if none are found
+     */
+    SocketServer anyBrokerSocketServer();
+
+    /**
+     * Return any one of the controller servers. Throw an error if none are found
+     */
+    SocketServer anyControllerSocketServer();
+
+    /**
+     * The underlying object which is responsible for setting up and tearing down the cluster.
+     */
+    Object getUnderlying();
+
+    default <T> T getUnderlying(Class<T> asClass) {
+        return asClass.cast(getUnderlying());
+    }
+
+    Admin createAdminClient(Properties configOverrides);
+
+    default Admin createAdminClient() {
+        return createAdminClient(new Properties());
+    }
+
+    void start();
+
+    void stop();
+}
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
new file mode 100644
index 0000000..6818e43
--- /dev/null
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.AutoStart;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+
+@ClusterTestDefaults(clusterType = Type.ZK)   // Set defaults for a few params in @ClusterTest(s)
+@ExtendWith(ClusterTestExtensions.class)
+public class ClusterTestExtensionsTest {
+
+    private final ClusterInstance clusterInstance;
+    private final ClusterConfig config;
+
+    ClusterTestExtensionsTest(ClusterInstance clusterInstance, ClusterConfig config) {     // Constructor injections
+        this.clusterInstance = clusterInstance;
+        this.config = config;
+    }
+
+    // Static methods can generate cluster configurations
+    static void generate1(ClusterGenerator clusterGenerator) {
+        clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build());
+    }
+
+    // BeforeEach run after class construction, but before cluster initialization and test invocation
+    @BeforeEach
+    public void beforeEach(ClusterConfig config) {
+        Assertions.assertSame(this.config, config, "Injected objects should be the same");
+        config.serverProperties().put("before", "each");
+    }
+
+    // AfterEach runs after test invocation and cluster teardown
+    @AfterEach
+    public void afterEach(ClusterConfig config) {
+        Assertions.assertSame(this.config, config, "Injected objects should be the same");
+    }
+
+    // With no params, configuration comes from the annotation defaults as well as @ClusterTestDefaults (if present)
+    @ClusterTest
+    public void testClusterTest(ClusterConfig config, ClusterInstance clusterInstance) {
+        Assertions.assertSame(this.config, config, "Injected objects should be the same");
+        Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
+        Assertions.assertEquals(clusterInstance.clusterType(), ClusterInstance.ClusterType.ZK); // From the class level default
+        Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("before"), "each");
+    }
+
+    // generate1 is a template method which generates any number of cluster configs
+    @ClusterTemplate("generate1")
+    public void testClusterTemplate() {
+        Assertions.assertEquals(clusterInstance.clusterType(), ClusterInstance.ClusterType.ZK,
+            "generate1 provided a Zk cluster, so we should see that here");
+        Assertions.assertEquals(clusterInstance.config().name().orElse(""), "Generated Test",
+            "generate 1 named this cluster config, so we should see that here");
+        Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("before"), "each");
+    }
+
+    // Multiple @ClusterTest can be used with @ClusterTests
+    @ClusterTests({
+        @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
+            @ClusterConfigProperty(key = "foo", value = "bar"),
+            @ClusterConfigProperty(key = "spam", value = "eggs")
+        }),
+        @ClusterTest(name = "cluster-tests-2", clusterType = Type.ZK, serverProperties = {
+            @ClusterConfigProperty(key = "foo", value = "baz"),
+            @ClusterConfigProperty(key = "spam", value = "eggz")
+        })
+    })
+    public void testClusterTests() {
+        if (clusterInstance.config().name().filter(name -> name.equals("cluster-tests-1")).isPresent()) {
+            Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("foo"), "bar");
+            Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("spam"), "eggs");
+        } else if (clusterInstance.config().name().filter(name -> name.equals("cluster-tests-2")).isPresent()) {
+            Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("foo"), "baz");
+            Assertions.assertEquals(clusterInstance.config().serverProperties().getProperty("spam"), "eggz");
+        } else {
+            Assertions.fail("Unknown cluster config " + clusterInstance.config().name());
+        }
+    }
+
+    @ClusterTest(autoStart = AutoStart.NO)
+    public void testNoAutoStart() {
+        Assertions.assertThrows(RuntimeException.class, clusterInstance::anyBrokerSocketServer);
+        clusterInstance.start();
+        Assertions.assertNotNull(clusterInstance.anyBrokerSocketServer());
+    }
+}
diff --git a/core/src/test/java/kafka/test/annotation/AutoStart.java b/core/src/test/java/kafka/test/annotation/AutoStart.java
new file mode 100644
index 0000000..24fdedf
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/AutoStart.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+public enum AutoStart {
+    YES,
+    NO,
+    DEFAULT
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
new file mode 100644
index 0000000..eb1434d
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Target({ElementType.ANNOTATION_TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ClusterConfigProperty {
+    String key();
+    String value();
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTemplate.java b/core/src/test/java/kafka/test/annotation/ClusterTemplate.java
new file mode 100644
index 0000000..f776b4e
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTemplate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Used to indicate that a test should call the method given by {@link #value()} to generate a number of
+ * cluster configurations. The method specified by the value should accept a single argument of the type
+ * {@link ClusterGenerator}. Any return value from the method is ignore. A test invocation
+ * will be generated for each {@link ClusterConfig} provided to the ClusterGenerator instance.
+ *
+ * The method given here must be static since it is invoked before any tests are actually run. Each test generated
+ * by this annotation will run as if it was defined as a separate test method with its own
+ * {@link org.junit.jupiter.api.Test}. That is to say, each generated test invocation will have a separate lifecycle.
+ *
+ * This annotation may be used in conjunction with {@link ClusterTest} and {@link ClusterTests} which also yield
+ * ClusterConfig instances.
+ *
+ * For Scala tests, the method should be defined in a companion object with the same name as the test class.
+ */
+@Documented
+@Target({METHOD})
+@Retention(RUNTIME)
+@TestTemplate
+public @interface ClusterTemplate {
+    /**
+     * Specify the static method used for generating cluster configs
+     */
+    String value();
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
new file mode 100644
index 0000000..687255c
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Documented
+@Target({METHOD})
+@Retention(RUNTIME)
+@TestTemplate
+public @interface ClusterTest {
+    Type clusterType() default Type.DEFAULT;
+    int brokers() default 0;
+    int controllers() default 0;
+    AutoStart autoStart() default AutoStart.DEFAULT;
+
+    String name() default "";
+    SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
+    String listener() default "";
+    ClusterConfigProperty[] serverProperties() default {};
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
new file mode 100644
index 0000000..cd8a66d
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import kafka.test.junit.ClusterTestExtensions;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Used to set class level defaults for any test template methods annotated with {@link ClusterTest} or
+ * {@link ClusterTests}. The default values here are also used as the source for defaults in
+ * {@link ClusterTestExtensions}.
+ */
+@Documented
+@Target({TYPE})
+@Retention(RUNTIME)
+public @interface ClusterTestDefaults {
+    Type clusterType() default Type.ZK;
+    int brokers() default 1;
+    int controllers() default 1;
+    boolean autoStart() default true;
+}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTests.java b/core/src/test/java/kafka/test/annotation/ClusterTests.java
new file mode 100644
index 0000000..64905f8
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/ClusterTests.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+import org.junit.jupiter.api.TestTemplate;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Documented
+@Target({METHOD})
+@Retention(RUNTIME)
+@TestTemplate
+public @interface ClusterTests {
+    ClusterTest[] value();
+}
diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java
new file mode 100644
index 0000000..8e8f236
--- /dev/null
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.annotation;
+
+/**
+ * The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations.
+ */
+public enum Type {
+    // RAFT,
+    ZK,
+    BOTH,
+    DEFAULT
+}
diff --git a/core/src/test/java/kafka/test/junit/ClusterInstanceParameterResolver.java b/core/src/test/java/kafka/test/junit/ClusterInstanceParameterResolver.java
new file mode 100644
index 0000000..3329e32
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/ClusterInstanceParameterResolver.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.test.ClusterInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+import java.lang.reflect.Executable;
+
+import static org.junit.platform.commons.util.AnnotationUtils.isAnnotated;
+
+/**
+ * This resolver provides an instance of {@link ClusterInstance} to a test invocation. The instance represents the
+ * underlying cluster being run for the current test. It can be injected into test methods or into the class
+ * constructor.
+ *
+ * N.B., if injected into the class constructor, the instance will not be fully initialized until the actual test method
+ * is being invoked. This is because the cluster is not started until after class construction and after "before"
+ * lifecycle methods have been run. Constructor injection is meant for convenience so helper methods can be defined on
+ * the test which can rely on a class member rather than an argument for ClusterInstance.
+ */
+public class ClusterInstanceParameterResolver implements ParameterResolver {
+    private final ClusterInstance clusterInstance;
+
+    ClusterInstanceParameterResolver(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+        if (!parameterContext.getParameter().getType().equals(ClusterInstance.class)) {
+            return false;
+        }
+
+        if (!extensionContext.getTestMethod().isPresent()) {
+            // Allow this to be injected into the class
+            extensionContext.getRequiredTestClass();
+            return true;
+        } else {
+            // If we're injecting into a method, make sure it's a test method and not a lifecycle method
+            Executable parameterizedMethod = parameterContext.getParameter().getDeclaringExecutable();
+            return isAnnotated(parameterizedMethod, TestTemplate.class);
+        }
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+        return clusterInstance;
+    }
+}
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
new file mode 100644
index 0000000..872b669
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.junit.platform.commons.util.ReflectionUtils;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+/**
+ * This class is a custom JUnit extension that will generate some number of test invocations depending on the processing
+ * of a few custom annotations. These annotations are placed on so-called test template methods. Template methods look
+ * like normal JUnit test methods, but instead of being invoked directly, they are used as templates for generating
+ * multiple test invocations.
+ *
+ * Test class that use this extension should use one of the following annotations on each template method:
+ *
+ * <ul>
+ *     <li>{@link ClusterTest}, define a single cluster configuration</li>
+ *     <li>{@link ClusterTests}, provide multiple instances of @ClusterTest</li>
+ *     <li>{@link ClusterTemplate}, define a static method that generates cluster configurations</li>
+ * </ul>
+ *
+ * Any combination of these annotations may be used on a given test template method. If no test invocations are
+ * generated after processing the annotations, an error is thrown.
+ *
+ * Depending on which annotations are used, and what values are given, different {@link ClusterConfig} will be
+ * generated. Each ClusterConfig is used to create an underlying Kafka cluster that is used for the actual test
+ * invocation.
+ *
+ * For example:
+ *
+ * <pre>
+ * &#64;ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+ * class SomeIntegrationTest {
+ *   &#64;ClusterTest(brokers = 1, controllers = 1, clusterType = ClusterType.Both)
+ *   def someTest(): Unit = {
+ *     assertTrue(condition)
+ *   }
+ * }
+ * </pre>
+ *
+ * will generate two invocations of "someTest" (since ClusterType.Both was given). For each invocation, the test class
+ * SomeIntegrationTest will be instantiated, lifecycle methods (before/after) will be run, and "someTest" will be invoked.
+ *
+ **/
+public class ClusterTestExtensions implements TestTemplateInvocationContextProvider {
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        return true;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context) {
+        ClusterTestDefaults defaults = getClusterTestDefaults(context.getRequiredTestClass());
+        List<TestTemplateInvocationContext> generatedContexts = new ArrayList<>();
+
+        // Process the @ClusterTemplate annotation
+        ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
+        if (clusterTemplateAnnot != null) {
+            processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add);
+            if (generatedContexts.size() == 0) {
+                throw new IllegalStateException("ClusterConfig generator method should provide at least one config");
+            }
+        }
+
+        // Process single @ClusterTest annotation
+        ClusterTest clusterTestAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTest.class);
+        if (clusterTestAnnot != null) {
+            processClusterTest(clusterTestAnnot, defaults, generatedContexts::add);
+        }
+
+        // Process multiple @ClusterTest annotation within @ClusterTests
+        ClusterTests clusterTestsAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTests.class);
+        if (clusterTestsAnnot != null) {
+            for (ClusterTest annot : clusterTestsAnnot.value()) {
+                processClusterTest(annot, defaults, generatedContexts::add);
+            }
+        }
+
+        if (generatedContexts.size() == 0) {
+            throw new IllegalStateException("Please annotate test methods with @ClusterTemplate, @ClusterTest, or " +
+                    "@ClusterTests when using the ClusterTestExtensions provider");
+        }
+
+        return generatedContexts.stream();
+    }
+
+    private void processClusterTemplate(ExtensionContext context, ClusterTemplate annot,
+                                        Consumer<TestTemplateInvocationContext> testInvocations) {
+        // If specified, call cluster config generated method (must be static)
+        List<ClusterConfig> generatedClusterConfigs = new ArrayList<>();
+        if (!annot.value().isEmpty()) {
+            generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);
+        } else {
+            // Ensure we have at least one cluster config
+            generatedClusterConfigs.add(ClusterConfig.defaultClusterBuilder().build());
+        }
+
+        generatedClusterConfigs.forEach(config -> {
+            if (config.clusterType() == Type.ZK) {
+                testInvocations.accept(new ZkClusterInvocationContext(config.copyOf()));
+            } else {
+                throw new IllegalStateException("Unknown cluster type " + config.clusterType());
+            }
+        });
+    }
+
+    private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
+        Object testInstance = context.getTestInstance().orElse(null);
+        Method method = ReflectionUtils.getRequiredMethod(context.getRequiredTestClass(), generateClustersMethods, ClusterGenerator.class);
+        ReflectionUtils.invokeMethod(method, testInstance, generator);
+    }
+
+    private void processClusterTest(ClusterTest annot, ClusterTestDefaults defaults,
+                                    Consumer<TestTemplateInvocationContext> testInvocations) {
+        final Type type;
+        if (annot.clusterType() == Type.DEFAULT) {
+            type = defaults.clusterType();
+        } else {
+            type = annot.clusterType();
+        }
+
+        final int brokers;
+        if (annot.brokers() == 0) {
+            brokers = defaults.brokers();
+        } else {
+            brokers = annot.brokers();
+        }
+
+        final int controllers;
+        if (annot.controllers() == 0) {
+            controllers = defaults.controllers();
+        } else {
+            controllers = annot.controllers();
+        }
+
+        if (brokers <= 0 || controllers <= 0) {
+            throw new IllegalArgumentException("Number of brokers/controllers must be greater than zero.");
+        }
+
+        final boolean autoStart;
+        switch (annot.autoStart()) {
+            case YES:
+                autoStart = true;
+                break;
+            case NO:
+                autoStart = false;
+                break;
+            case DEFAULT:
+                autoStart = defaults.autoStart();
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+
+        ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, annot.securityProtocol());
+        if (!annot.name().isEmpty()) {
+            builder.name(annot.name());
+        }
+        if (!annot.listener().isEmpty()) {
+            builder.listenerName(annot.listener());
+        }
+
+        Properties properties = new Properties();
+        for (ClusterConfigProperty property : annot.serverProperties()) {
+            properties.put(property.key(), property.value());
+        }
+
+        switch (type) {
+            case ZK:
+            case BOTH:
+                ClusterConfig config = builder.build();
+                config.serverProperties().putAll(properties);
+                testInvocations.accept(new ZkClusterInvocationContext(config));
+                break;
+        }
+    }
+
+    private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {
+        return Optional.ofNullable(testClass.getDeclaredAnnotation(ClusterTestDefaults.class))
+            .orElseGet(() -> EmptyClass.class.getDeclaredAnnotation(ClusterTestDefaults.class));
+    }
+
+    @ClusterTestDefaults
+    private final static class EmptyClass {
+        // Just used as a convenience to get default values from the annotation
+    }
+}
diff --git a/core/src/test/java/kafka/test/junit/GenericParameterResolver.java b/core/src/test/java/kafka/test/junit/GenericParameterResolver.java
new file mode 100644
index 0000000..70387e1
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/GenericParameterResolver.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+/**
+ * This resolver is used for supplying any type of object to the test invocation. It does not restrict where the given
+ * type can be injected, it simply checks if the requested injection type matches the type given in the constructor. If
+ * it matches, the given object is returned.
+ *
+ * This is useful for injecting helper objects and objects which can be fully initialized before the test lifecycle
+ * begins.
+ */
+public class GenericParameterResolver<T> implements ParameterResolver {
+
+    private final T instance;
+    private final Class<T> clazz;
+
+    GenericParameterResolver(T instance, Class<T> clazz) {
+        this.instance = instance;
+        this.clazz = clazz;
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+        return parameterContext.getParameter().getType().equals(clazz);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) {
+        return instance;
+    }
+}
diff --git a/core/src/test/java/kafka/test/junit/README.md b/core/src/test/java/kafka/test/junit/README.md
new file mode 100644
index 0000000..dbd2bf4
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/README.md
@@ -0,0 +1,139 @@
+This document describes a custom JUnit extension which allows for running the same JUnit tests against multiple Kafka 
+cluster configurations.
+
+# Annotations
+
+A new `@ClusterTest` annotation is introduced which allows for a test to declaratively configure an underlying Kafka cluster.
+
+```scala
+@ClusterTest
+def testSomething(): Unit = { ... }
+```
+
+This annotation has fields for cluster type and number of brokers, as well as commonly parameterized configurations. 
+Arbitrary server properties can also be provided in the annotation:
+
+```java
+@ClusterTest(clusterType = Type.Zk, securityProtocol = "PLAINTEXT", properties = {
+  @ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
+  @ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
+})
+void testSomething() { ... }
+```
+
+Multiple `@ClusterTest` annotations can be given to generate more than one test invocation for the annotated method.
+
+```scala
+@ClusterTests(Array(
+    @ClusterTest(securityProtocol = "PLAINTEXT"),
+    @ClusterTest(securityProtocol = "SASL_PLAINTEXT")
+))
+def testSomething(): Unit = { ... }
+```
+
+A class-level `@ClusterTestDefaults` annotation is added to provide default values for `@ClusterTest` defined within 
+the class. The intention here is to reduce repetitive annotation declarations and also make changing defaults easier 
+for a class with many test cases.
+
+# Dynamic Configuration
+
+In order to allow for more flexible cluster configuration, a `@ClusterTemplate` annotation is also introduced. This 
+annotation takes a single string value which references a static method on the test class. This method is used to 
+produce any number of test configurations using a fluent builder style API.
+
+```java
+@ClusterTemplate("generateConfigs")
+void testSomething() { ... }
+
+static void generateConfigs(ClusterGenerator clusterGenerator) {
+  clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
+      .name("Generated Test 1")
+      .serverProperties(props1)
+      .ibp("2.7-IV1")
+      .build());
+  clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
+      .name("Generated Test 2")
+      .serverProperties(props2)
+      .ibp("2.7-IV2")
+      .build());
+  clusterGenerator.accept(ClusterConfig.defaultClusterBuilder()
+      .name("Generated Test 3")
+      .serverProperties(props3)
+      .build());
+}
+```
+
+This "escape hatch" from the simple declarative style configuration makes it easy to dynamically configure clusters.
+
+
+# JUnit Extension
+
+One thing to note is that our "test*" methods are no longer _tests_, but rather they are test templates. We have added 
+a JUnit extension called `ClusterTestExtensions` which knows how to process these annotations in order to generate test 
+invocations. Test classes that wish to make use of these annotations need to explicitly register this extension:
+
+```scala
+import kafka.test.junit.ClusterTestExtensions
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ApiVersionsRequestTest {
+   ...
+}
+```
+
+# JUnit Lifecycle
+
+The lifecycle of a test class that is extended with `ClusterTestExtensions` follows:
+
+* JUnit discovers test template methods that are annotated with `@ClusterTest`, `@ClusterTests`, or `@ClusterTemplate`
+* `ClusterTestExtensions` is called for each of these template methods in order to generate some number of test invocations
+
+For each generated invocation:
+* Static `@BeforeAll` methods are called
+* Test class is instantiated
+* Non-static `@BeforeEach` methods are called
+* Kafka Cluster is started
+* Test method is invoked
+* Kafka Cluster is stopped
+* Non-static `@AfterEach` methods are called
+* Static `@AfterAll` methods are called
+
+`@BeforeEach` methods give an opportunity to setup additional test dependencies before the cluster is started. 
+
+# Dependency Injection
+
+A few classes are introduced to provide context to the underlying cluster and to provide reusable functionality that was 
+previously garnered from the test hierarchy.
+
+* ClusterConfig: a mutable cluster configuration, includes cluster type, number of brokers, properties, etc
+* ClusterInstance: a shim to the underlying class that actually runs the cluster, provides access to things like SocketServers
+* IntegrationTestHelper: connection related functions taken from IntegrationTestHarness and BaseRequestTest
+
+In order to have one of these objects injected, simply add it as a parameter to your test class, `@BeforeEach` method, or test method.
+
+| Injection | Class | BeforeEach | Test | Notes
+| --- | --- | --- | --- | --- |
+| ClusterConfig | yes | yes | yes* | Once in the test, changing config has no effect |
+| ClusterInstance | yes* | no | yes | Injectable at class level for convenience, can only be accessed inside test |
+| IntegrationTestHelper | yes | yes | yes | - |
+
+```scala
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class SomeTestClass(helper: IntegrationTestHelper) {
+ 
+  @BeforeEach
+  def setup(config: ClusterConfig): Unit = {
+    config.serverProperties().put("foo", "bar")
+  }
+
+  @ClusterTest
+  def testSomething(cluster: ClusterInstance): Unit = {
+    val topics = cluster.createAdminClient().listTopics()
+  }
+}
+```
+
+# Gotchas
+* Test methods annotated with JUnit's `@Test` will still be run, but no cluster will be started and no dependency 
+  injection will happen. This is generally not what you want
+* Even though ClusterConfig is accessible and mutable inside the test method, changing it will have no affect on the cluster 
\ No newline at end of file
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
new file mode 100644
index 0000000..62cc80d
--- /dev/null
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.api.IntegrationTestHarness;
+import kafka.network.SocketServer;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.compat.java8.OptionConverters;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * <ul>
+ *     <li>ClusterConfig (the same instance passed to the constructor)</li>
+ *     <li>ClusterInstance (includes methods to expose underlying SocketServer-s)</li>
+ *     <li>IntegrationTestHelper (helper methods)</li>
+ * </ul>
+ */
+public class ZkClusterInvocationContext implements TestTemplateInvocationContext {
+
+    private final ClusterConfig clusterConfig;
+    private final AtomicReference<IntegrationTestHarness> clusterReference;
+
+    public ZkClusterInvocationContext(ClusterConfig clusterConfig) {
+        this.clusterConfig = clusterConfig;
+        this.clusterReference = new AtomicReference<>();
+    }
+
+    @Override
+    public String getDisplayName(int invocationIndex) {
+        String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[Zk %d] %s", invocationIndex, clusterDesc);
+    }
+
+    @Override
+    public List<Extension> getAdditionalExtensions() {
+        if (clusterConfig.numControllers() != 1) {
+            throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
+        }
+        ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, clusterReference);
+        return Arrays.asList(
+            (BeforeTestExecutionCallback) context -> {
+                // We have to wait to actually create the underlying cluster until after our @BeforeEach methods
+                // have run. This allows tests to set up external dependencies like ZK, MiniKDC, etc.
+                // However, since we cannot create this instance until we are inside the test invocation, we have
+                // to use a container class (AtomicReference) to provide this cluster object to the test itself
+
+                // This is what tests normally extend from to start a cluster, here we create it anonymously and
+                // configure the cluster using values from ClusterConfig
+                IntegrationTestHarness cluster = new IntegrationTestHarness() {
+                    @Override
+                    public Properties serverConfig() {
+                        return clusterConfig.serverProperties();
+                    }
+
+                    @Override
+                    public Properties adminClientConfig() {
+                        return clusterConfig.adminClientProperties();
+                    }
+
+                    @Override
+                    public Properties consumerConfig() {
+                        return clusterConfig.consumerProperties();
+                    }
+
+                    @Override
+                    public Properties producerConfig() {
+                        return clusterConfig.producerProperties();
+                    }
+
+                    @Override
+                    public SecurityProtocol securityProtocol() {
+                        return clusterConfig.securityProtocol();
+                    }
+
+                    @Override
+                    public ListenerName listenerName() {
+                        return clusterConfig.listenerName().map(ListenerName::normalised)
+                            .orElseGet(() -> ListenerName.forSecurityProtocol(securityProtocol()));
+                    }
+
+                    @Override
+                    public Option<Properties> serverSaslProperties() {
+                        if (clusterConfig.saslServerProperties().isEmpty()) {
+                            return Option.empty();
+                        } else {
+                            return Option.apply(clusterConfig.saslServerProperties());
+                        }
+                    }
+
+                    @Override
+                    public Option<Properties> clientSaslProperties() {
+                        if (clusterConfig.saslClientProperties().isEmpty()) {
+                            return Option.empty();
+                        } else {
+                            return Option.apply(clusterConfig.saslClientProperties());
+                        }
+                    }
+
+                    @Override
+                    public int brokerCount() {
+                        // Controllers are also brokers in zk mode, so just use broker count
+                        return clusterConfig.numBrokers();
+                    }
+
+                    @Override
+                    public Option<File> trustStoreFile() {
+                        return OptionConverters.toScala(clusterConfig.trustStoreFile());
+                    }
+                };
+
+                clusterReference.set(cluster);
+                if (clusterConfig.isAutoStart()) {
+                    clusterShim.start();
+                }
+            },
+            (AfterTestExecutionCallback) context -> clusterShim.stop(),
+            new ClusterInstanceParameterResolver(clusterShim),
+            new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
+        );
+    }
+
+    public static class ZkClusterInstance implements ClusterInstance {
+
+        final AtomicReference<IntegrationTestHarness> clusterReference;
+        final ClusterConfig config;
+        final AtomicBoolean started = new AtomicBoolean(false);
+        final AtomicBoolean stopped = new AtomicBoolean(false);
+
+        ZkClusterInstance(ClusterConfig config, AtomicReference<IntegrationTestHarness> clusterReference) {
+            this.config = config;
+            this.clusterReference = clusterReference;
+        }
+
+        @Override
+        public String bootstrapServers() {
+            return TestUtils.bootstrapServers(clusterReference.get().servers(), clusterReference.get().listenerName());
+        }
+
+        @Override
+        public Collection<SocketServer> brokerSocketServers() {
+            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+                    .map(KafkaServer::socketServer)
+                    .collect(Collectors.toList());
+        }
+
+        @Override
+        public ListenerName clientListener() {
+            return clusterReference.get().listenerName();
+        }
+
+        @Override
+        public Collection<SocketServer> controllerSocketServers() {
+            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+                .filter(broker -> broker.kafkaController().isActive())
+                .map(KafkaServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public SocketServer anyBrokerSocketServer() {
+            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+                .map(KafkaServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+        }
+
+        @Override
+        public SocketServer anyControllerSocketServer() {
+            return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+                .filter(broker -> broker.kafkaController().isActive())
+                .map(KafkaServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+        }
+
+        @Override
+        public ClusterType clusterType() {
+            return ClusterType.ZK;
+        }
+
+        @Override
+        public ClusterConfig config() {
+            return config;
+        }
+
+        @Override
+        public IntegrationTestHarness getUnderlying() {
+            return clusterReference.get();
+        }
+
+        @Override
+        public Admin createAdminClient(Properties configOverrides) {
+            return clusterReference.get().createAdminClient(configOverrides);
+        }
+
+        @Override
+        public void start() {
+            if (started.compareAndSet(false, true)) {
+                clusterReference.get().setUp();
+            }
+        }
+
+        @Override
+        public void stop() {
+            if (stopped.compareAndSet(false, true)) {
+                clusterReference.get().tearDown();
+            }
+        }
+    }
+}
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 2bd55b8..9237011 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -97,7 +97,7 @@ trait SaslSetup {
     (serverKeytabFile.get, clientKeytabFile.get)
   }
 
-  protected def jaasSections(kafkaServerSaslMechanisms: Seq[String],
+  def jaasSections(kafkaServerSaslMechanisms: Seq[String],
                              kafkaClientSaslMechanism: Option[String],
                              mode: SaslSetupMode = Both,
                              kafkaServerEntryName: String = JaasTestUtils.KafkaServerContextName): Seq[JaasSection] = {
diff --git a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
new file mode 100644
index 0000000..c960e35
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, RequestTestUtils, ResponseHeader}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Utils
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.Properties
+import scala.annotation.nowarn
+import scala.reflect.ClassTag
+
+object IntegrationTestUtils {
+
+  private def sendRequest(socket: Socket, request: Array[Byte]): Unit = {
+    val outgoing = new DataOutputStream(socket.getOutputStream)
+    outgoing.writeInt(request.length)
+    outgoing.write(request)
+    outgoing.flush()
+  }
+
+  def sendWithHeader(request: AbstractRequest, header: RequestHeader, socket: Socket): Unit = {
+    val serializedBytes = Utils.toArray(RequestTestUtils.serializeRequestWithHeader(header, request))
+    sendRequest(socket, serializedBytes)
+  }
+
+  def nextRequestHeader[T <: AbstractResponse](apiKey: ApiKeys,
+                                               apiVersion: Short,
+                                               clientId: String = "client-id",
+                                               correlationIdOpt: Option[Int] = None): RequestHeader = {
+    val correlationId = correlationIdOpt.getOrElse {
+      this.correlationId += 1
+      this.correlationId
+    }
+    new RequestHeader(apiKey, apiVersion, clientId, correlationId)
+  }
+
+  def send(request: AbstractRequest,
+           socket: Socket,
+           clientId: String = "client-id",
+           correlationId: Option[Int] = None): Unit = {
+    val header = nextRequestHeader(request.apiKey, request.version, clientId, correlationId)
+    sendWithHeader(request, header, socket)
+  }
+
+  def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short)
+                                    (implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
+    val incoming = new DataInputStream(socket.getInputStream)
+    val len = incoming.readInt()
+
+    val responseBytes = new Array[Byte](len)
+    incoming.readFully(responseBytes)
+
+    val responseBuffer = ByteBuffer.wrap(responseBytes)
+    ResponseHeader.parse(responseBuffer, apiKey.responseHeaderVersion(version))
+
+    AbstractResponse.parseResponse(apiKey, responseBuffer, version) match {
+      case response: T => response
+      case response =>
+        throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, but found ${response.getClass}")
+    }
+  }
+
+  def sendAndReceive[T <: AbstractResponse](request: AbstractRequest,
+                                            socket: Socket,
+                                            clientId: String = "client-id",
+                                            correlationId: Option[Int] = None)
+                                           (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+    send(request, socket, clientId, correlationId)
+    receive[T](socket, request.apiKey, request.version)
+  }
+
+  def connectAndReceive[T <: AbstractResponse](request: AbstractRequest,
+                                               destination: SocketServer,
+                                               listenerName: ListenerName)
+                                              (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+    val socket = connect(destination, listenerName)
+    try sendAndReceive[T](request, socket)
+    finally socket.close()
+  }
+
+  protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+  private var correlationId = 0
+
+  def connect(socketServer: SocketServer,
+              listenerName: ListenerName): Socket = {
+    new Socket("localhost", socketServer.boundPort(listenerName))
+  }
+
+  def clientSecurityProps(certAlias: String): Properties = {
+    TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, None, certAlias, TestUtils.SslCertificateCn, None) // TODO use real trust store and client SASL properties
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index d71764e..f7163ca 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -16,21 +16,28 @@
  */
 package kafka.server
 
-import java.util.Properties
+import integration.kafka.server.IntegrationTestUtils
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.junit.jupiter.api.Assertions._
 
+import java.util.Properties
 import scala.jdk.CollectionConverters._
 
-abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
+abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
+
+  def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
+    IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, cluster.brokerSocketServers().asScala.head, listenerName)
+  }
 
   def controlPlaneListenerName = new ListenerName("CONTROLLER")
 
   // Configure control plane listener to make sure we have separate listeners for testing.
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
+  def brokerPropertyOverrides(properties: Properties): Unit = {
+    val securityProtocol = cluster.config().securityProtocol()
     properties.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListenerName.value())
     properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${controlPlaneListenerName.value()}:$securityProtocol,$securityProtocol:$securityProtocol")
     properties.setProperty("listeners", s"$securityProtocol://localhost:0,${controlPlaneListenerName.value()}://localhost:0")
@@ -38,15 +45,15 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
   }
 
   def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
-    val overrideHeader = nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
-    val socket = connect(anySocketServer)
+    val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
+    val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
     try {
-      sendWithHeader(request, overrideHeader, socket)
-      receive[ApiVersionsResponse](socket, ApiKeys.API_VERSIONS, 0.toShort)
+      IntegrationTestUtils.sendWithHeader(request, overrideHeader, socket)
+      IntegrationTestUtils.receive[ApiVersionsResponse](socket, ApiKeys.API_VERSIONS, 0.toShort)
     } finally socket.close()
   }
 
-  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = {
+  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName): Unit = {
     val expectedApis = ApiKeys.brokerApis()
     if (listenerName == controlPlaneListenerName) {
       expectedApis.add(ApiKeys.ENVELOPE)
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 0cdae57..dc35bae 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,32 +17,40 @@
 
 package kafka.server
 
+import kafka.test.{ClusterConfig, ClusterInstance}
 import org.apache.kafka.common.message.ApiVersionsRequestData
-import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
+import org.apache.kafka.common.requests.ApiVersionsRequest
+import kafka.test.annotation.ClusterTest
+import kafka.test.junit.ClusterTestExtensions
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.extension.ExtendWith
 
-class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
 
-  override def brokerCount: Int = 1
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
 
-  @Test
+  @BeforeEach
+  def setup(config: ClusterConfig): Unit = {
+    super.brokerPropertyOverrides(config.serverProperties())
+  }
+
+  @ClusterTest
   def testApiVersionsRequest(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
-    val apiVersionsResponse = sendApiVersionsRequest(request)
-    validateApiVersionsResponse(apiVersionsResponse)
+    val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
+    validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
   }
 
-  @Test
+  @ClusterTest
   def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
     val apiVersionsResponse = sendApiVersionsRequest(request, super.controlPlaneListenerName)
     validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
   }
 
-  @Test
+  @ClusterTest
   def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
     val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)
@@ -54,30 +62,25 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
     assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
   }
 
-  @Test
+  @ClusterTest
   def testApiVersionsRequestValidationV0(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
-    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
-    validateApiVersionsResponse(apiVersionsResponse)
+    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
+    validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
   }
 
-  @Test
+  @ClusterTest
   def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, super.controlPlaneListenerName)
     validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
   }
 
-  @Test
+  @ClusterTest
   def testApiVersionsRequestValidationV3(): Unit = {
     // Invalid request because Name and Version are empty by default
     val apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), 3.asInstanceOf[Short])
-    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest)
+    val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
     assertEquals(Errors.INVALID_REQUEST.code(), apiVersionsResponse.data.errorCode())
   }
-
-  private def sendApiVersionsRequest(request: ApiVersionsRequest,
-                                     listenerName: ListenerName = super.listenerName): ApiVersionsResponse = {
-    connectAndReceive[ApiVersionsResponse](request, listenerName = listenerName)
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index a011a97..61bd02a 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -17,8 +17,10 @@
 
 package kafka.server
 
-import java.net.InetAddress
+import integration.kafka.server.IntegrationTestUtils
+import kafka.test.ClusterInstance
 
+import java.net.InetAddress
 import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
 import org.apache.kafka.common.config.internals.QuotaConfigs
 import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
@@ -26,23 +28,25 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+
 import java.util
 import java.util.concurrent.{ExecutionException, TimeUnit}
-
 import kafka.utils.TestUtils
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
 
 import scala.jdk.CollectionConverters._
 
-class ClientQuotasRequestTest extends BaseRequestTest {
+@ClusterTestDefaults(clusterType = Type.ZK)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ClientQuotasRequestTest(cluster: ClusterInstance) {
   private val ConsumerByteRateProp = QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG
   private val ProducerByteRateProp = QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG
   private val RequestPercentageProp = QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG
   private val IpConnectionRateProp = QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG
 
-  override val brokerCount = 1
-
-  @Test
+  @ClusterTest
   def testAlterClientQuotasRequest(): Unit = {
 
     val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
@@ -112,7 +116,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     ))
   }
 
-  @Test
+  @ClusterTest
   def testAlterClientQuotasRequestValidateOnly(): Unit = {
     val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
 
@@ -170,11 +174,11 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     ))
   }
 
-  @Test
+  @ClusterTest
   def testClientQuotasForScramUsers(): Unit = {
     val userName = "user"
 
-    val results = createAdminClient().alterUserScramCredentials(util.Arrays.asList(
+    val results = cluster.createAdminClient().alterUserScramCredentials(util.Arrays.asList(
       new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
     results.all.get
 
@@ -193,7 +197,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     ))
   }
 
-  @Test
+  @ClusterTest
   def testAlterIpQuotasRequest(): Unit = {
     val knownHost = "1.2.3.4"
     val unknownHost = "2.3.4.5"
@@ -218,7 +222,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
         var currentServerQuota = 0
         TestUtils.waitUntilTrue(
           () => {
-            currentServerQuota = servers.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)
+            currentServerQuota = cluster.brokerSocketServers().asScala.head.connectionQuotas.connectionRateForIp(entityIp)
             Math.abs(expectedMatches(entity) - currentServerQuota) < 0.01
           }, s"Connection quota of $entity is not ${expectedMatches(entity)} but $currentServerQuota")
       }
@@ -251,40 +255,25 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     verifyIpQuotas(allIpEntityFilter, Map.empty)
   }
 
-  @Test
-  def testAlterClientQuotasBadUser(): Unit = {
-    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
-    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
-  }
+  @ClusterTest
+  def testAlterClientQuotasInvalidRequests(): Unit = {
+    var entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
+    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
 
-  @Test
-  def testAlterClientQuotasBadClientId(): Unit = {
-    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
-    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
-  }
+    entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
+    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
 
-  @Test
-  def testAlterClientQuotasBadEntityType(): Unit = {
-    val entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
-    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(RequestPercentageProp -> Some(12.34)), validateOnly = true))
-  }
+    entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
+    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true))
 
-  @Test
-  def testAlterClientQuotasEmptyEntity(): Unit = {
-    val entity = new ClientQuotaEntity(Map.empty.asJava)
-    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(ProducerByteRateProp -> Some(10000.5)), validateOnly = true))
-  }
+    entity = new ClientQuotaEntity(Map.empty.asJava)
+    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true))
 
-  @Test
-  def testAlterClientQuotasBadConfigKey(): Unit = {
-    val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
-    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map("bad" -> Some(1.0)), validateOnly = true))
-  }
+    entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(("bad" -> Some(1.0))), validateOnly = true))
 
-  @Test
-  def testAlterClientQuotasBadConfigValue(): Unit = {
-    val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user").asJava)
-    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map(ProducerByteRateProp -> Some(10000.5)), validateOnly = true))
+    entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+    assertThrows(classOf[InvalidRequestException], () => alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true))
   }
 
   private def expectInvalidRequestWithMessage(runnable: => Unit, expectedMessage: String): Unit = {
@@ -292,7 +281,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     assertTrue(exception.getMessage.contains(expectedMessage), s"Expected message $exception to contain $expectedMessage")
   }
 
-  @Test
+  @ClusterTest
   def testAlterClientQuotasInvalidEntityCombination(): Unit = {
     val userAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
     val clientAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "client", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
@@ -303,7 +292,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
       validateOnly = true), expectedExceptionMessage)
   }
 
-  @Test
+  @ClusterTest
   def testAlterClientQuotasBadIp(): Unit = {
     val invalidHostPatternEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "abc-123").asJava)
     val unresolvableHostEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "ip").asJava)
@@ -314,7 +303,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
       validateOnly = true), expectedExceptionMessage)
   }
 
-  @Test
+  @ClusterTest
   def testDescribeClientQuotasInvalidFilterCombination(): Unit = {
     val ipFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
     val userFilterComponent = ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)
@@ -357,7 +346,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     (matchUserClientEntities ++ matchIpEntities).foreach(e => result(e._1).get(10, TimeUnit.SECONDS))
   }
 
-  @Test
+  @ClusterTest
   def testDescribeClientQuotasMatchExact(): Unit = {
     setupDescribeClientQuotasMatchTest()
 
@@ -402,7 +391,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     }
   }
 
-  @Test
+  @ClusterTest
   def testDescribeClientQuotasMatchPartial(): Unit = {
     setupDescribeClientQuotasMatchTest()
 
@@ -509,13 +498,13 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, entity => false)
   }
 
-  @Test
+  @ClusterTest
   def testClientQuotasUnsupportedEntityTypes(): Unit = {
     val entity = new ClientQuotaEntity(Map(("other" -> "name")).asJava)
     assertThrows(classOf[UnsupportedVersionException], () => verifyDescribeEntityQuotas(entity, Map.empty))
   }
 
-  @Test
+  @ClusterTest
   def testClientQuotasSanitized(): Unit = {
     // An entity with name that must be sanitized when writing to Zookeeper.
     val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user with spaces")).asJava)
@@ -529,7 +518,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     ))
   }
 
-  @Test
+  @ClusterTest
   def testClientQuotasWithDefaultName(): Unit = {
     // An entity using the name associated with the default entity name. The entity's name should be sanitized so
     // that it does not conflict with the default entity name.
@@ -580,7 +569,9 @@ class ClientQuotasRequestTest extends BaseRequestTest {
 
   private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = {
     val request = new DescribeClientQuotasRequest.Builder(filter).build()
-    connectAndReceive[DescribeClientQuotasResponse](request, destination = controllerSocketServer)
+    IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](request,
+      destination = cluster.anyControllerSocketServer(),
+      listenerName = cluster.clientListener())
   }
 
   private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) =
@@ -606,7 +597,9 @@ class ClientQuotasRequestTest extends BaseRequestTest {
 
   private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = {
     val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
-    connectAndReceive[AlterClientQuotasResponse](request, destination = controllerSocketServer)
+    IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](request,
+      destination = cluster.anyControllerSocketServer(),
+      listenerName = cluster.clientListener())
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 0444264..bbc71ca 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -16,6 +16,8 @@
   */
 package kafka.server
 
+import integration.kafka.server.IntegrationTestUtils
+
 import java.net.Socket
 import java.util.Collections
 import kafka.api.{KafkaSasl, SaslSetup}
@@ -23,49 +25,53 @@ import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
+import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.{ClusterConfig, ClusterInstance}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.extension.ExtendWith
 
-class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with SaslSetup {
-  override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-  private val kafkaClientSaslMechanism = "PLAIN"
-  private val kafkaServerSaslMechanisms = List("PLAIN")
-  protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
-  protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
-  override def brokerCount = 1
+import scala.jdk.CollectionConverters._
 
-  @BeforeEach
-  override def setUp(): Unit = {
-    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
-    super.setUp()
-  }
 
-  @AfterEach
-  override def tearDown(): Unit = {
-    super.tearDown()
-    closeSasl()
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
+
+  val kafkaClientSaslMechanism = "PLAIN"
+  val kafkaServerSaslMechanisms = List("PLAIN")
+
+  private var sasl: SaslSetup = _
+
+  @BeforeEach
+  def setupSasl(config: ClusterConfig): Unit = {
+    sasl = new SaslSetup() {}
+    sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName))
+    config.saslServerProperties().putAll(sasl.kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
+    config.saslClientProperties().putAll(sasl.kafkaClientSaslProperties(kafkaClientSaslMechanism))
+    super.brokerPropertyOverrides(config.serverProperties())
   }
 
-  @Test
+  @ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
   def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
-    val socket = connect()
+    val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
     try {
-      val apiVersionsResponse = sendAndReceive[ApiVersionsResponse](
+      val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse)
+      validateApiVersionsResponse(apiVersionsResponse, cluster.clientListener())
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
     }
   }
 
-  @Test
+  @ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
   def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
-    val socket = connect()
+    val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
     try {
       sendSaslHandshakeRequestValidateResponse(socket)
-      val response = sendAndReceive[ApiVersionsResponse](
+      val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
       assertEquals(Errors.ILLEGAL_SASL_STATE.code, response.data.errorCode)
     } finally {
@@ -73,26 +79,31 @@ class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with Sas
     }
   }
 
-  @Test
+  @ClusterTest(securityProtocol = SecurityProtocol.SASL_PLAINTEXT, clusterType = Type.ZK)
   def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
-    val socket = connect()
+    val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener())
     try {
       val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0)
       val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest)
       assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
-      val apiVersionsResponse2 = sendAndReceive[ApiVersionsResponse](
+      val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse2)
+      validateApiVersionsResponse(apiVersionsResponse2, cluster.clientListener())
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
     }
   }
 
+  @AfterEach
+  def closeSasl(): Unit = {
+    sasl.closeSasl()
+  }
+
   private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = {
     val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"),
       ApiKeys.SASL_HANDSHAKE.latestVersion)
-    val response = sendAndReceive[SaslHandshakeResponse](request, socket)
+    val response = IntegrationTestUtils.sendAndReceive[SaslHandshakeResponse](request, socket)
     assertEquals(Errors.NONE, response.error)
     assertEquals(Collections.singletonList("PLAIN"), response.enabledMechanisms)
   }


Mime
View raw message