kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (#8695)
Date Tue, 02 Jun 2020 23:10:48 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 775e5a2  KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (#8695)
775e5a2 is described below

commit 775e5a2ba92c917e2debc2521f759e66d648cf39
Author: Nikolay <nizhikov@apache.org>
AuthorDate: Wed Jun 3 01:34:43 2020 +0300

    KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (#8695)
    
    1. Enables `TLSv1.3` by default with Java 11 or newer.
    2. Add unit tests that cover the various TLSv1.2 and TLSv1.3 combinations.
    3. Extend `benchmark_test.py` and `replication_test.py` to run with 'TLSv1.2'
    or 'TLSv1.3'.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../org/apache/kafka/common/config/SslConfigs.java |  31 +++-
 .../common/network/SslTransportLayerTest.java      |  30 +---
 .../common/network/SslTransportTls12Tls13Test.java | 169 +++++++++++++++++++
 .../network/SslVersionsTransportLayerTest.java     | 183 +++++++++++++++++++++
 .../unit/kafka/network/SocketServerTest.scala      |   2 +-
 docs/upgrade.html                                  |   7 +-
 tests/kafkatest/benchmarks/core/benchmark_test.py  |  44 ++---
 tests/kafkatest/services/kafka/kafka.py            |  21 ++-
 .../services/kafka/templates/kafka.properties      |   5 +-
 tests/kafkatest/services/kafka/util.py             |  27 +--
 tests/kafkatest/services/kafka_log4j_appender.py   |   4 +-
 tests/kafkatest/services/log_compaction_tester.py  |   5 +-
 .../services/replica_verification_tool.py          |   6 +-
 .../kafkatest/services/security/security_config.py |  19 ++-
 tests/kafkatest/tests/core/replication_test.py     |   7 +-
 tests/kafkatest/tests/core/upgrade_test.py         |   3 +-
 tests/kafkatest/utils/remote_account.py            |  19 +++
 17 files changed, 483 insertions(+), 99 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index f114e66..6dc0f58 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.config;
 
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.utils.Java;
 import org.apache.kafka.common.utils.Utils;
 
 import javax.net.ssl.KeyManagerFactory;
@@ -49,11 +50,15 @@ public class SslConfigs {
 
     public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
     public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. "
-            + "Default setting is TLSv1.2, which is fine for most cases. "
-            + "Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 and SSLv3 "
-            + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.";
+        + "The default is 'TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. "
+        + "This value should be fine for most use cases. "
+        + "Allowed values in recent JVMs are 'TLSv1.2' and 'TLSv1.3'. 'TLS', 'TLSv1.1', 'SSL', 'SSLv2' and 'SSLv3' "
+        + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. "
+        + "With the default value for this config and 'ssl.enabled.protocols', clients will downgrade to 'TLSv1.2' if "
+        + "the server does not support 'TLSv1.3'. If this config is set to 'TLSv1.2', clients will not use 'TLSv1.3' even "
+        + "if it is one of the values in ssl.enabled.protocols and the server only supports 'TLSv1.3'.";
 
-    public static final String DEFAULT_SSL_PROTOCOL = "TLSv1.2";
+    public static final String DEFAULT_SSL_PROTOCOL;
 
     public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
     public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
@@ -63,8 +68,22 @@ public class SslConfigs {
             + "By default all the available cipher suites are supported.";
 
     public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
-    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections.";
-    public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2";
+    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
+        + "The default is 'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With the "
+        + "default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback "
+        + "to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most "
+        + "cases. Also see the config documentation for `ssl.protocol`.";
+    public static final String DEFAULT_SSL_ENABLED_PROTOCOLS;
+
+    static {
+        if (Java.IS_JAVA11_COMPATIBLE) {
+            DEFAULT_SSL_PROTOCOL = "TLSv1.3";
+            DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.3";
+        } else {
+            DEFAULT_SSL_PROTOCOL = "TLSv1.2";
+            DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2";
+        }
+    }
 
     public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
     public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 0e793b0..ac94817 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -41,7 +41,6 @@ import org.junit.runners.Parameterized;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLServerSocketFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -578,26 +577,6 @@ public class SslTransportLayerTest {
         server.verifyAuthenticationMetrics(1, 2);
     }
 
-    @Test
-    public void testUnsupportedCipher() throws Exception {
-        String[] cipherSuites = ((SSLServerSocketFactory) SSLServerSocketFactory.getDefault()).getSupportedCipherSuites();
-        if (cipherSuites != null && cipherSuites.length > 1) {
-            sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
-            sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[0]));
-            sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
-            sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[1]));
-
-            server = createEchoServer(SecurityProtocol.SSL);
-            createSelector(sslClientConfigs);
-
-            checkAuthentiationFailed("1", "TLSv1.1");
-            server.verifyAuthenticationMetrics(0, 1);
-
-            checkAuthentiationFailed("2", "TLSv1");
-            server.verifyAuthenticationMetrics(0, 2);
-        }
-    }
-
     /** Checks connection failed using the specified {@code tlsVersion}. */
     private void checkAuthentiationFailed(String node, String tlsVersion) throws IOException {
         sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(tlsVersion));
@@ -627,7 +606,6 @@ public class SslTransportLayerTest {
      */
     @Test
     public void testUnsupportedCiphers() throws Exception {
-        String node = "0";
         SSLContext context = SSLContext.getInstance(tlsProtocol);
         context.init(null, null, null);
         String[] cipherSuites = context.getDefaultSSLParameters().getCipherSuites();
@@ -636,10 +614,8 @@ public class SslTransportLayerTest {
 
         sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
         createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        checkAuthentiationFailed("1", tlsProtocol);
         server.verifyAuthenticationMetrics(0, 1);
     }
 
@@ -1250,7 +1226,7 @@ public class SslTransportLayerTest {
         void run() throws IOException;
     }
 
-    private static class TestSslChannelBuilder extends SslChannelBuilder {
+    static class TestSslChannelBuilder extends SslChannelBuilder {
 
         private Integer netReadBufSizeOverride;
         private Integer netWriteBufSizeOverride;
@@ -1361,7 +1337,7 @@ public class SslTransportLayerTest {
             }
         }
 
-        private static class ResizeableBufferSize {
+        static class ResizeableBufferSize {
             private Integer bufSizeOverride;
             ResizeableBufferSize(Integer bufSizeOverride) {
                 this.bufSizeOverride = bufSizeOverride;
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java
new file mode 100644
index 0000000..81b86d4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Java;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assume.assumeTrue;
+
+public class SslTransportTls12Tls13Test {
+    private static final int BUFFER_SIZE = 4 * 1024;
+    private static final Time TIME = Time.SYSTEM;
+
+    private NioEchoServer server;
+    private Selector selector;
+    private Map<String, Object> sslClientConfigs;
+    private Map<String, Object> sslServerConfigs;
+
+    @Before
+    public void setup() throws Exception {
+        // Create certificates for use by client and server. Add server cert to client truststore and vice versa.
+        CertStores serverCertStores = new CertStores(true, "server", "localhost");
+        CertStores clientCertStores = new CertStores(false, "client", "localhost");
+        sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
+        sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
+
+        LogContext logContext = new LogContext();
+        ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext);
+        channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, logContext);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (selector != null)
+            this.selector.close();
+        if (server != null)
+            this.server.close();
+    }
+
+    /**
+     * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used.
+     */
+    @Test
+    public void testCiphersSuiteForTls12FailsForTls13() throws Exception {
+        assumeTrue(Java.IS_JAVA11_COMPATIBLE);
+
+        String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
+
+        sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.3"));
+        sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
+        server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
+            SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
+
+        sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.3"));
+        sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
+
+        checkAuthentiationFailed();
+    }
+
+    /**
+     * Tests that connections can't be made if server uses TLSv1.2 with custom cipher suite and client uses TLSv1.3.
+     */
+    @Test
+    public void testCiphersSuiteFailForServerTls12ClientTls13() throws Exception {
+        assumeTrue(Java.IS_JAVA11_COMPATIBLE);
+
+        String tls12CipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
+        String tls13CipherSuite = "TLS_AES_128_GCM_SHA256";
+
+        sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.2"));
+        sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(tls12CipherSuite));
+        server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
+            SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
+
+        sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3");
+        sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(tls13CipherSuite));
+
+        checkAuthentiationFailed();
+    }
+
+    /**
+     * Tests that connections can be made with TLSv1.3 cipher suite.
+     */
+    @Test
+    public void testCiphersSuiteForTls13() throws Exception {
+        assumeTrue(Java.IS_JAVA11_COMPATIBLE);
+
+        String cipherSuite = "TLS_AES_128_GCM_SHA256";
+
+        sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
+        server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
+            SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
+
+        sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
+        checkAuthenticationSucceed();
+    }
+
+    /**
+     * Tests that connections can be made with TLSv1.2 cipher suite.
+     */
+    @Test
+    public void testCiphersSuiteForTls12() throws Exception {
+        String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
+
+        sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(",")));
+        sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
+        server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
+            SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
+
+        sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(",")));
+        sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
+        checkAuthenticationSucceed();
+    }
+
+    /** Checks connection failed using the specified {@code tlsVersion}. */
+    private void checkAuthentiationFailed() throws IOException, InterruptedException {
+        sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3"));
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+        selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        NetworkTestUtils.waitForChannelClose(selector, "0", ChannelState.State.AUTHENTICATION_FAILED);
+        server.verifyAuthenticationMetrics(0, 1);
+    }
+
+    private void checkAuthenticationSucceed() throws IOException, InterruptedException {
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+        selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.waitForChannelReady(selector, "0");
+        server.verifyAuthenticationMetrics(1, 0);
+    }
+
+    private void createSelector(Map<String, Object> sslClientConfigs) {
+        SslTransportLayerTest.TestSslChannelBuilder channelBuilder = new SslTransportLayerTest.TestSslChannelBuilder(Mode.CLIENT);
+        channelBuilder.configureBufferSizes(null, null, null);
+        channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(100 * 5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
new file mode 100644
index 0000000..9f930a7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Java;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for the SSL transport layer.
+ * Checks different versions of the protocol usage on the server and client.
+ */
+@RunWith(value = Parameterized.class)
+public class SslVersionsTransportLayerTest {
+    private static final int BUFFER_SIZE = 4 * 1024;
+    private static final Time TIME = Time.SYSTEM;
+
+    private final List<String> serverProtocols;
+    private final List<String> clientProtocols;
+
+    @Parameterized.Parameters(name = "tlsServerProtocol={0},tlsClientProtocol={1}")
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+
+        values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.2")});
+
+        if (Java.IS_JAVA11_COMPATIBLE) {
+            values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.3")});
+            values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.2")});
+            values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.3")});
+            values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")});
+            values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")});
+            values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")});
+            values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")});
+            values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.3")});
+            values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.2")});
+            values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")});
+            values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")});
+            values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.3")});
+            values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.2")});
+            values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")});
+            values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")});
+        }
+        return values;
+    }
+
+    /**
+     * Be aware that you can turn on debug mode for a javax.net.ssl library with the line {@code System.setProperty("javax.net.debug", "ssl:handshake");}
+     * @param serverProtocols Server protocols.
+     * @param clientProtocols Client protocols.
+     */
+    public SslVersionsTransportLayerTest(List<String> serverProtocols, List<String> clientProtocols) {
+        this.serverProtocols = serverProtocols;
+        this.clientProtocols = clientProtocols;
+    }
+
+    /**
+     * Tests that connection success with the default TLS version.
+     */
+    @Test
+    public void testTlsDefaults() throws Exception {
+        // Create certificates for use by client and server. Add server cert to client truststore and vice versa.
+        CertStores serverCertStores = new CertStores(true, "server",  "localhost");
+        CertStores clientCertStores = new CertStores(false, "client", "localhost");
+
+        Map<String, Object> sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores, clientProtocols);
+        Map<String, Object> sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores, serverProtocols);
+
+        NioEchoServer server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
+            SecurityProtocol.SSL,
+            new TestSecurityConfig(sslServerConfigs),
+            null,
+            TIME);
+        Selector selector = createClientSelector(sslClientConfigs);
+
+        String node = "0";
+        selector.connect(node, new InetSocketAddress("localhost", server.port()), BUFFER_SIZE, BUFFER_SIZE);
+
+        if (isCompatible(serverProtocols, clientProtocols)) {
+            NetworkTestUtils.waitForChannelReady(selector, node);
+
+            int msgSz = 1024 * 1024;
+            String message = TestUtils.randomString(msgSz);
+            selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes())));
+            while (selector.completedReceives().isEmpty()) {
+                selector.poll(100L);
+            }
+            int totalBytes = msgSz + 4; // including 4-byte size
+            server.waitForMetric("incoming-byte", totalBytes);
+            server.waitForMetric("outgoing-byte", totalBytes);
+            server.waitForMetric("request", 1);
+            server.waitForMetric("response", 1);
+        } else {
+            NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+            server.verifyAuthenticationMetrics(0, 1);
+        }
+    }
+
+    /**
+     * <p>
+     * The explanation of this check in the structure of the ClientHello SSL message.
+     * Please, take a look at the <a href="https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6">Guide</a>,
+     * "Send ClientHello Message" section.
+     * <p>
+     * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 uses the extension supported_versions and not this field to negotiate protocol version
+     * ...
+     * > supported_versions: Lists which versions of TLS the client supports. In particular, if the client
+     * > requests TLS 1.3, then the client version field has the value TLSv1.2 and this extension
+     * > contains the value TLSv1.3; if the client requests TLS 1.2, then the client version field has the
+     * > value TLSv1.2 and this extension either doesn’t exist or contains the value TLSv1.2 but not the value TLSv1.3.
+     * <p>
+     *
+     * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 client can't change protocol to TLSv1.3.
+     *
+     * @param serverProtocols Server protocols. Expected to be non empty.
+     * @param clientProtocols Client protocols. Expected to be non empty.
+     * @return {@code true} if client should be able to connect to the server.
+     */
+    private boolean isCompatible(List<String> serverProtocols, List<String> clientProtocols) {
+        assertNotNull(serverProtocols);
+        assertFalse(serverProtocols.isEmpty());
+        assertNotNull(clientProtocols);
+        assertFalse(clientProtocols.isEmpty());
+
+        return serverProtocols.contains(clientProtocols.get(0)) ||
+            (clientProtocols.get(0).equals("TLSv1.3") && !Collections.disjoint(serverProtocols, clientProtocols));
+    }
+
+    private static Map<String, Object> getTrustingConfig(CertStores certStores, CertStores peerCertStores, List<String> tlsProtocols) {
+        Map<String, Object> configs = certStores.getTrustingConfig(peerCertStores);
+        configs.putAll(sslConfig(tlsProtocols));
+        return configs;
+    }
+
+    private static Map<String, Object> sslConfig(List<String> tlsProtocols) {
+        Map<String, Object> sslConfig = new HashMap<>();
+        sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocols.get(0));
+        sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsProtocols);
+        return sslConfig;
+    }
+
+    private Selector createClientSelector(Map<String, Object> sslClientConfigs) {
+        SslTransportLayerTest.TestSslChannelBuilder channelBuilder =
+            new SslTransportLayerTest.TestSslChannelBuilder(Mode.CLIENT);
+        channelBuilder.configureBufferSizes(null, null, null);
+        channelBuilder.configure(sslClientConfigs);
+        return new Selector(100 * 5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext());
+    }
+}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8f1091e..10e4d28 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -170,7 +170,7 @@ class SocketServerTest {
   }
 
   private def sslClientSocket(port: Int): Socket = {
-    val sslContext = SSLContext.getInstance("TLSv1.2")
+    val sslContext = SSLContext.getInstance(TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS)
     sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom())
     val socketFactory = sslContext.getSocketFactory
     val socket = socketFactory.createSocket("localhost", port)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 103fb99..be1e342 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -23,7 +23,12 @@
 <ul>
     <li>Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application
         scalability using exactly-once guarantees
-        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a></li>
+        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a>)
+    </li>
+    <li>TLSv1.3 has been enabled by default for Java 11 or newer. The client and server will negotiate TLSv1.3 if
+        both support it and fallback to TLSv1.2 otherwise. See
+        <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-573%3A+Enable+TLSv1.3+by+default">KIP-573</a> for more details.
+    </li>
 </ul>
 
 <h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2.5.0</a></h5>
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 2b4ff87..6bab304 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -55,12 +55,12 @@ class Benchmark(Test):
     def setUp(self):
         self.zk.start()
 
-    def start_kafka(self, security_protocol, interbroker_security_protocol, version):
+    def start_kafka(self, security_protocol, interbroker_security_protocol, version, tls_version=None):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
             self.zk, security_protocol=security_protocol,
             interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
-            version=version)
+            version=version, tls_version=tls_version)
         self.kafka.log_level = "INFO"  # We don't DEBUG logging here
         self.kafka.start()
 
@@ -68,11 +68,12 @@ class Benchmark(Test):
     @parametrize(acks=1, topic=TOPIC_REP_ONE)
     @parametrize(acks=1, topic=TOPIC_REP_THREE)
     @parametrize(acks=-1, topic=TOPIC_REP_THREE)
-    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL'])
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['SSL'], tls_version=['TLSv1.2', 'TLSv1.3'])
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT'])
     @cluster(num_nodes=7)
     @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
     def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
-                                 compression_type="none", security_protocol='PLAINTEXT', client_version=str(DEV_BRANCH),
+                                 compression_type="none", security_protocol='PLAINTEXT', tls_version=None, client_version=str(DEV_BRANCH),
                                  broker_version=str(DEV_BRANCH)):
         """
         Setup: 1 node zk + 3 node kafka cluster
@@ -85,7 +86,7 @@ class Benchmark(Test):
         client_version = KafkaVersion(client_version)
         broker_version = KafkaVersion(broker_version)
         self.validate_versions(client_version, broker_version)
-        self.start_kafka(security_protocol, security_protocol, broker_version)
+        self.start_kafka(security_protocol, security_protocol, broker_version, tls_version)
         # Always generate the same total amount of data
         nrecords = int(self.target_data_size / message_size)
 
@@ -101,9 +102,10 @@ class Benchmark(Test):
         return compute_aggregate_throughput(self.producer)
 
     @cluster(num_nodes=5)
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
-    def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
+    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
+    def test_long_term_producer_throughput(self, compression_type="none",
+                                           security_protocol='PLAINTEXT', tls_version=None,
                                            interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
                                            broker_version=str(DEV_BRANCH)):
         """
@@ -119,7 +121,7 @@ class Benchmark(Test):
         self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
         self.producer = ProducerPerformanceService(
             self.test_context, 1, self.kafka,
             topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
@@ -157,11 +159,11 @@ class Benchmark(Test):
         return data
 
     @cluster(num_nodes=5)
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
+    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
     @cluster(num_nodes=6)
     @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
-    def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
+    def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
                                 interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
                                 broker_version=str(DEV_BRANCH)):
         """
@@ -178,7 +180,7 @@ class Benchmark(Test):
         self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
         self.logger.info("BENCHMARK: End to end latency")
         self.perf = EndToEndLatencyService(
             self.test_context, 1, self.kafka,
@@ -189,9 +191,9 @@ class Benchmark(Test):
         return latency(self.perf.results[0]['latency_50th_ms'],  self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
 
     @cluster(num_nodes=6)
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
-    def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT",
+    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
+    def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
                                    interbroker_security_protocol=None,
                                    client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
         """
@@ -207,7 +209,7 @@ class Benchmark(Test):
         self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
         num_records = 10 * 1000 * 1000  # 10e6
 
         self.producer = ProducerPerformanceService(
@@ -236,9 +238,9 @@ class Benchmark(Test):
         return data
 
     @cluster(num_nodes=6)
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
-    def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT",
+    @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
+    def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
                                  interbroker_security_protocol=None, num_consumers=1,
                                  client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
         """
@@ -250,7 +252,7 @@ class Benchmark(Test):
         self.validate_versions(client_version, broker_version)
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
-        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
+        self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
         num_records = 10 * 1000 * 1000  # 10e6
 
         # seed kafka w/messages
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index ec44fc9..58bd47a 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
 import json
 import os.path
 import re
@@ -31,7 +30,7 @@ from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2
+from kafkatest.version import DEV_BRANCH
 from kafkatest.services.kafka.util import fix_opts_for_new_jvm
 
 
@@ -95,17 +94,20 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
+    def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
+                 interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
                  jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
                  zk_client_secure=False,
-                 listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts=""):
+                 listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None,
+                 extra_kafka_opts="", tls_version=None):
         """
         :param context: test context
         :param ZookeeperService zk:
         :param dict topics: which topics to create automatically
         :param str security_protocol: security protocol for clients to use
+        :param str tls_version: version of the TLS protocol.
         :param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
         :param str client_sasl_mechanism: sasl mechanism for clients to use
         :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
@@ -129,6 +131,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.zk = zk
 
         self.security_protocol = security_protocol
+        self.tls_version = tls_version
         self.client_sasl_mechanism = client_sasl_mechanism
         self.topics = topics
         self.minikdc = None
@@ -215,7 +218,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                                 zk_sasl=self.zk.zk_sasl, zk_tls=self.zk_client_secure,
                                 client_sasl_mechanism=self.client_sasl_mechanism,
                                 interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
-                                listener_security_config=self.listener_security_config)
+                                listener_security_config=self.listener_security_config,
+                                tls_version=self.tls_version)
         for port in self.port_mappings.values():
             if port.open:
                 config.enable_security_protocol(port.security_protocol)
@@ -354,15 +358,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def start_node(self, node, timeout_sec=60):
         node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
+
+        self.security_config.setup_node(node)
+        self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(), broker=True)
+
         prop_file = self.prop_file(node)
         self.logger.info("kafka.properties:")
         self.logger.info(prop_file)
         node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
         node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
 
-        self.security_config.setup_node(node)
-        self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(), broker=True)
-
         cmd = self.start_cmd(node)
         self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
         with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 9795eac..0c028aa 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -44,7 +44,10 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
 {% endif %}
 {% endfor %}
 {% endif %}
-
+{% if security_config.tls_version is not none %}
+ssl.enabled.protocols={{ security_config.tls_version }}
+ssl.protocol={{ security_config.tls_version }}
+{% endif %}
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
 ssl.key.password=test-ks-passwd
diff --git a/tests/kafkatest/services/kafka/util.py b/tests/kafkatest/services/kafka/util.py
index 92d59c7..8782ebe 100644
--- a/tests/kafkatest/services/kafka/util.py
+++ b/tests/kafkatest/services/kafka/util.py
@@ -13,16 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os.path
-
 from collections import namedtuple
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
-from kafkatest.directory_layout.kafka_path import create_path_resolver
+
+from kafkatest.utils.remote_account import java_version
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
 
 TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
 
 new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)])
 
+
 def fix_opts_for_new_jvm(node):
     # Startup scripts for early versions of Kafka contains options
     # that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC.
@@ -38,21 +38,4 @@ def fix_opts_for_new_jvm(node):
         cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; "
     return cmd
 
-def java_version(node):
-    # Determine java version on the node
-    version = -1
-    for line in node.account.ssh_capture("java -version"):
-        if line.find("version") != -1:
-            version = parse_version_str(line)
-    return version
-
-def parse_version_str(line):
-    # Parse java version string. Examples:
-    #`openjdk version "11.0.5" 2019-10-15` will return 11.
-    #`java version "1.5.0"` will return 5.
-    line = line[line.find('version \"') + 9:]
-    dot_pos = line.find(".")
-    if line[:dot_pos] == "1":
-        return int(line[dot_pos+1:line.find(".", dot_pos+1)])
-    else:
-        return int(line[:dot_pos])
+
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index 7ada9c5..1212a7d 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -28,14 +28,14 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT"):
+    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT", tls_version=None):
         super(KafkaLog4jAppender, self).__init__(context, num_nodes)
 
         self.kafka = kafka
         self.topic = topic
         self.max_messages = max_messages
         self.security_protocol = security_protocol
-        self.security_config = SecurityConfig(self.context, security_protocol)
+        self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
         self.stop_timeout_sec = 30
 
         for node in self.nodes:
diff --git a/tests/kafkatest/services/log_compaction_tester.py b/tests/kafkatest/services/log_compaction_tester.py
index 4a19650..cc6bf4f 100644
--- a/tests/kafkatest/services/log_compaction_tester.py
+++ b/tests/kafkatest/services/log_compaction_tester.py
@@ -33,12 +33,13 @@ class LogCompactionTester(KafkaPathResolverMixin, BackgroundThreadService):
             "collect_default": True}
     }
 
-    def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30):
+    def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30, tls_version=None):
         super(LogCompactionTester, self).__init__(context, 1)
 
         self.kafka = kafka
         self.security_protocol = security_protocol
-        self.security_config = SecurityConfig(self.context, security_protocol)
+        self.tls_version = tls_version
+        self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
         self.stop_timeout_sec = stop_timeout_sec
         self.log_compaction_completed = False
 
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index 8751797..13a1288 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -29,14 +29,16 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT", stop_timeout_sec=30):
+    def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT",
+                 stop_timeout_sec=30, tls_version=None):
         super(ReplicaVerificationTool, self).__init__(context, num_nodes)
 
         self.kafka = kafka
         self.topic = topic
         self.report_interval_ms = report_interval_ms
         self.security_protocol = security_protocol
-        self.security_config = SecurityConfig(self.context, security_protocol)
+        self.tls_version = tls_version
+        self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
         self.partition_lag = {}
         self.stop_timeout_sec = stop_timeout_sec
 
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index f70d7d2..2fb4f47 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -19,10 +19,13 @@ import subprocess
 from tempfile import mkdtemp
 from shutil import rmtree
 from ducktape.template import TemplateRenderer
+
 from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
 import itertools
 
+from kafkatest.utils.remote_account import java_version
+
 
 class SslStores(object):
     def __init__(self, local_scratch_dir, logger=None):
@@ -140,7 +143,7 @@ class SecurityConfig(TemplateRenderer):
     def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
                  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
                  zk_sasl=False, zk_tls=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
-                 listener_security_config=ListenerSecurityConfig()):
+                 listener_security_config=ListenerSecurityConfig(), tls_version=None):
         """
         Initialize the security properties for the node and copy
         keystore and truststore to the remote node if the transport protocol 
@@ -186,6 +189,10 @@ class SecurityConfig(TemplateRenderer):
             'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
             'sasl.kerberos.service.name' : 'kafka'
         }
+
+        if tls_version is not None:
+            self.properties.update({'tls.version' : tls_version})
+
         self.properties.update(self.listener_security_config.client_listener_overrides)
         self.jaas_override_variables = jaas_override_variables or {}
 
@@ -201,7 +208,8 @@ class SecurityConfig(TemplateRenderer):
                               template_props=template_props,
                               static_jaas_conf=static_jaas_conf,
                               jaas_override_variables=jaas_override_variables,
-                              listener_security_config=self.listener_security_config)
+                              listener_security_config=self.listener_security_config,
+                              tls_version=self.tls_version)
 
     def enable_security_protocol(self, security_protocol):
         self.has_sasl = self.has_sasl or self.is_sasl(security_protocol)
@@ -259,6 +267,9 @@ class SecurityConfig(TemplateRenderer):
         if self.has_sasl:
             self.setup_sasl(node)
 
+        if java_version(node) <= 11 and self.properties.get('tls.version') == 'TLSv1.3':
+            self.properties.update({'tls.version': 'TLSv1.2'})
+
     def setup_credentials(self, node, path, zk_connect, broker):
         if broker:
             self.maybe_create_scram_credentials(node, zk_connect, path, self.interbroker_sasl_mechanism,
@@ -304,6 +315,10 @@ class SecurityConfig(TemplateRenderer):
         return self.properties['security.protocol']
 
     @property
+    def tls_version(self):
+        return self.properties.get('tls.version')
+
+    @property
     def client_sasl_mechanism(self):
         return self.properties['sasl.mechanism']
 
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index f5c6422..01ef34f 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -125,10 +125,10 @@ class ReplicationTest(EndToEndTest):
             broker_type="leader",
             security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512")
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"])
+            security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"])
     def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type,
                                              client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI",
-                                             compression_type=None, enable_idempotence=False):
+                                             compression_type=None, enable_idempotence=False, tls_version=None):
         """Replication tests.
         These tests verify that replication provides simple durability guarantees by checking that data acked by
         brokers is still available for consumption in the face of various failure scenarios.
@@ -149,7 +149,8 @@ class ReplicationTest(EndToEndTest):
                           security_protocol=security_protocol,
                           interbroker_security_protocol=security_protocol,
                           client_sasl_mechanism=client_sasl_mechanism,
-                          interbroker_sasl_mechanism=interbroker_sasl_mechanism)
+                          interbroker_sasl_mechanism=interbroker_sasl_mechanism,
+                          tls_version=tls_version)
         self.kafka.start()
 
         compression_types = None if not compression_type else [compression_type]
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index f5ec08c..6c0e447 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -23,8 +23,9 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
+from kafkatest.utils.remote_account import java_version
 from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
-from kafkatest.services.kafka.util import java_version, new_jdk_not_supported
+from kafkatest.services.kafka.util import new_jdk_not_supported
 
 class TestUpgrade(ProduceConsumeValidateTest):
 
diff --git a/tests/kafkatest/utils/remote_account.py b/tests/kafkatest/utils/remote_account.py
index d6ea72f..b572e06 100644
--- a/tests/kafkatest/utils/remote_account.py
+++ b/tests/kafkatest/utils/remote_account.py
@@ -37,3 +37,22 @@ def line_count(node, file):
         raise Exception("Expected single line of output from wc -l")
 
     return int(out[0].strip().split(" ")[0])
+
+def java_version(node):
+    # Determine java version on the node
+    version = -1
+    for line in node.account.ssh_capture("java -version"):
+        if line.find("version") != -1:
+            version = parse_version_str(line)
+    return version
+
+def parse_version_str(line):
+    # Parse java version string. Examples:
+    #`openjdk version "11.0.5" 2019-10-15` will return 11.
+    #`java version "1.5.0"` will return 5.
+    line = line[line.find('version \"') + 9:]
+    dot_pos = line.find(".")
+    if line[:dot_pos] == "1":
+        return int(line[dot_pos+1:line.find(".", dot_pos+1)])
+    else:
+        return int(line[:dot_pos])


Mime
View raw message