kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602) (#8644)
Date Thu, 04 Jun 2020 13:26:16 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 84020bf  KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup`
(KIP-602) (#8644)
84020bf is described below

commit 84020bfc13fba37415648a3268338721949dd2d3
Author: Badai Aqrandista <badaiaqrandista@gmail.com>
AuthorDate: Thu Jun 4 23:21:52 2020 +1000

    KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602)
(#8644)
    
    This applies to the producer, consumer, admin client, connect worker
    and inter broker communication.
    
    `ClientDnsLookup.DEFAULT` has been deprecated and a warning
    will be logged if it's explicitly set in a client config.
    
    Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../org/apache/kafka/clients/ClientDnsLookup.java  |  2 +-
 .../java/org/apache/kafka/clients/ClientUtils.java | 20 ++++++++++++++----
 .../apache/kafka/clients/CommonClientConfigs.java  | 24 +++++++++++++++++++---
 .../kafka/clients/admin/AdminClientConfig.java     |  3 ++-
 .../kafka/clients/consumer/ConsumerConfig.java     |  3 ++-
 .../kafka/clients/producer/ProducerConfig.java     |  3 ++-
 .../org/apache/kafka/clients/ClientUtilsTest.java  | 14 ++++++++++---
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  2 +-
 .../clients/consumer/internals/FetcherTest.java    |  4 ++--
 .../clients/producer/internals/SenderTest.java     |  2 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java |  2 +-
 .../kafka/admin/BrokerApiVersionsCommand.scala     |  4 ++--
 .../controller/ControllerChannelManager.scala      |  2 +-
 .../TransactionMarkerChannelManager.scala          |  2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../kafka/server/ReplicaFetcherBlockingSend.scala  |  2 +-
 .../kafka/tools/ReplicaVerificationTool.scala      |  2 +-
 docs/upgrade.html                                  |  6 ++++++
 18 files changed, 73 insertions(+), 26 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
index 96d47c3..844f236 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
@@ -24,7 +24,7 @@ public enum ClientDnsLookup {
     USE_ALL_DNS_IPS("use_all_dns_ips"),
     RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY("resolve_canonical_bootstrap_servers_only");
 
-    private String clientDnsLookup;
+    private final String clientDnsLookup;
 
     ClientDnsLookup(String clientDnsLookup) {
         this.clientDnsLookup = clientDnsLookup;
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index bcdac45..5e5286e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -108,13 +108,25 @@ public final class ClientUtils {
 
     static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup)
throws UnknownHostException {
         InetAddress[] addresses = InetAddress.getAllByName(host);
-        if (ClientDnsLookup.USE_ALL_DNS_IPS == clientDnsLookup) {
-            return filterPreferredAddresses(addresses);
-        } else {
-            return Collections.singletonList(addresses[0]);
+
+        switch (clientDnsLookup) {
+            case DEFAULT:
+                return Collections.singletonList(addresses[0]);
+            case USE_ALL_DNS_IPS:
+            case RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY:
+                return filterPreferredAddresses(addresses);
         }
+
+        throw new IllegalStateException("Unhandled ClientDnsLookup instance: " + clientDnsLookup);
     }
 
+    /**
+     * Return a list containing the first address in `allAddresses` and subsequent addresses
+     * that are a subtype of the first address.
+     *
+     * The outcome is that all returned addresses are either IPv4 or IPv6 (InetAddress has
two
+     * subclasses: Inet4Address and Inet6Address).
+     */
     static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) {
         List<InetAddress> preferredAddresses = new ArrayList<>();
         Class<? extends InetAddress> clazz = null;
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 987389a6..22984db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -42,9 +42,18 @@ public class CommonClientConfigs {
                                                        + "servers (you may want more than
one, though, in case a server is down).";
 
     public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
-    public static final String CLIENT_DNS_LOOKUP_DOC = "Controls how the client uses DNS
lookups. If set to <code>use_all_dns_ips</code> then, when the lookup returns
multiple IP addresses for a hostname,"
-                                                       + " they will all be attempted to
connect to before failing the connection. Applies to both bootstrap and advertised servers."
-                                                       + " If the value is <code>resolve_canonical_bootstrap_servers_only</code>
each entry will be resolved and expanded into a list of canonical names.";
+    public static final String CLIENT_DNS_LOOKUP_DOC = "Controls how the client uses DNS
lookups. "
+                                                       + "If set to <code>use_all_dns_ips</code>,
connect to each returned IP "
+                                                       + "address in sequence until a successful
connection is established. "
+                                                       + "After a disconnection, the next
IP is used. Once all IPs have been "
+                                                       + "used once, the client resolves
the IP(s) from the hostname again "
+                                                       + "(both the JVM and the OS cache
DNS name lookups, however). "
+                                                       + "If set to <code>resolve_canonical_bootstrap_servers_only</code>,
"
+                                                       + "resolve each bootstrap address
into a list of canonical names. After "
+                                                       + "the bootstrap phase, this behaves
the same as <code>use_all_dns_ips</code>. "
+                                                       + "If set to <code>default</code>
(deprecated), attempt to connect to the "
+                                                       + "first IP address returned by the
lookup, even if the lookup returns multiple "
+                                                       + "IP addresses.";
 
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
     public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds
after which we force a refresh of metadata even if we haven't seen any partition leadership
changes to proactively discover any new brokers or partitions.";
@@ -167,4 +176,13 @@ public class CommonClientConfigs {
         }
         return rval;
     }
+
+    public static void warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
+        String clientDnsLookupValue = config.getString(CLIENT_DNS_LOOKUP_CONFIG);
+        if (clientDnsLookupValue.equals(ClientDnsLookup.DEFAULT.toString()))
+            log.warn("Configuration '{}' with value '{}' is deprecated and will be removed
in " +
+                "future version. Please use '{}' or another non-deprecated value.",
+                CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
+                ClientDnsLookup.USE_ALL_DNS_IPS);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index ad62f1f..ee93003 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -182,7 +182,7 @@ public class AdminClientConfig extends AbstractConfig {
                                         METRICS_RECORDING_LEVEL_DOC)
                                 .define(CLIENT_DNS_LOOKUP_CONFIG,
                                         Type.STRING,
-                                        ClientDnsLookup.DEFAULT.toString(),
+                                        ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                                         in(ClientDnsLookup.DEFAULT.toString(),
                                            ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                                            ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
@@ -205,6 +205,7 @@ public class AdminClientConfig extends AbstractConfig {
 
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
+        CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
         return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 910c942..fbc5f41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -314,7 +314,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(CLIENT_DNS_LOOKUP_CONFIG,
                                         Type.STRING,
-                                        ClientDnsLookup.DEFAULT.toString(),
+                                        ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                                         in(ClientDnsLookup.DEFAULT.toString(),
                                            ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                                            ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
@@ -543,6 +543,7 @@ public class ConsumerConfig extends AbstractConfig {
 
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
+        CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
         Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this,
parsedValues);
         maybeOverrideClientId(refinedConfigs);
         return refinedConfigs;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index cb74ede..cfc9b06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -279,7 +279,7 @@ public class ProducerConfig extends AbstractConfig {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(),
new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(CLIENT_DNS_LOOKUP_CONFIG,
                                         Type.STRING,
-                                        ClientDnsLookup.DEFAULT.toString(),
+                                        ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                                         in(ClientDnsLookup.DEFAULT.toString(),
                                            ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                                            ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
@@ -413,6 +413,7 @@ public class ProducerConfig extends AbstractConfig {
 
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
+        CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
         Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this,
parsedValues);
         maybeOverrideEnableIdempotence(refinedConfigs);
         maybeOverrideClientId(refinedConfigs);
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index 5dd65ed..fc9a42f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -97,21 +97,29 @@ public class ClientUtilsTest {
 
     @Test(expected = UnknownHostException.class)
     public void testResolveUnknownHostException() throws UnknownHostException {
-        ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.DEFAULT);
+        ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS);
     }
 
     @Test
     public void testResolveDnsLookup() throws UnknownHostException {
-        assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.DEFAULT).size());
+        // Note that kafka.apache.org resolves to 2 IP addresses
+        assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT).size());
     }
 
     @Test
     public void testResolveDnsLookupAllIps() throws UnknownHostException {
+        // Note that kafka.apache.org resolves to 2 IP addresses
         assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size());
     }
 
+    @Test
+    public void testResolveDnsLookupResolveCanonicalBootstrapServers() throws UnknownHostException
{
+        // Note that kafka.apache.org resolves to 2 IP addresses
+        assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY).size());
+    }
+
     private List<InetSocketAddress> checkWithoutLookup(String... url) {
-        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT);
+        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.USE_ALL_DNS_IPS);
     }
 
     private List<InetSocketAddress> checkWithLookup(List<String> url) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 77a9e88..ac04e5d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -272,7 +272,7 @@ public class KafkaAdminClientTest {
 
     private static Cluster mockBootstrapCluster() {
         return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(
-                singletonList("localhost:8121"), ClientDnsLookup.DEFAULT));
+                singletonList("localhost:8121"), ClientDnsLookup.USE_ALL_DNS_IPS));
     }
 
     private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 24ebafa..c36b823 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2040,7 +2040,7 @@ public class FetcherTest {
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.DEFAULT,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.USE_ALL_DNS_IPS,
                 time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
 
         ByteBuffer buffer = ApiVersionsResponse.
@@ -3487,7 +3487,7 @@ public class FetcherTest {
         TopicPartition t2p0 = new TopicPartition(topicName2, 0);
         // Expect a metadata refresh.
         metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
-                ClientDnsLookup.DEFAULT));
+                ClientDnsLookup.USE_ALL_DNS_IPS));
 
         Map<String, Integer> partitionNumByTopic = new HashMap<>();
         partitionNumByTopic.put(topicName, 2);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5eb2e20..3ec8c80 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -262,7 +262,7 @@ public class SenderTest {
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.DEFAULT,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.USE_ALL_DNS_IPS,
                 time, true, new ApiVersions(), throttleTimeSensor, logContext);
 
         ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 9e40e56..3217752 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -277,7 +277,7 @@ public class WorkerConfig extends AbstractConfig {
                         Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
                 .define(CLIENT_DNS_LOOKUP_CONFIG,
                         Type.STRING,
-                        ClientDnsLookup.DEFAULT.toString(),
+                        ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                         in(ClientDnsLookup.DEFAULT.toString(),
                            ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
                            ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 006d73d..d5e8cc7 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -222,7 +222,7 @@ object BrokerApiVersionsCommand {
           CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
         .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
           Type.STRING,
-          ClientDnsLookup.DEFAULT.toString,
+          ClientDnsLookup.USE_ALL_DNS_IPS.toString,
           in(ClientDnsLookup.DEFAULT.toString,
             ClientDnsLookup.USE_ALL_DNS_IPS.toString,
             ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
@@ -296,7 +296,7 @@ object BrokerApiVersionsCommand {
         DefaultSendBufferBytes,
         DefaultReceiveBufferBytes,
         requestTimeoutMs,
-        ClientDnsLookup.DEFAULT,
+        ClientDnsLookup.USE_ALL_DNS_IPS,
         time,
         true,
         new ApiVersions,
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 8228795..2c9b738 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -153,7 +153,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         config.requestTimeoutMs,
-        ClientDnsLookup.DEFAULT,
+        ClientDnsLookup.USE_ALL_DNS_IPS,
         time,
         false,
         new ApiVersions,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 6fe2575..5bdf18a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -79,7 +79,7 @@ object TransactionMarkerChannelManager {
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       config.socketReceiveBufferBytes,
       config.requestTimeoutMs,
-      ClientDnsLookup.DEFAULT,
+      ClientDnsLookup.USE_ALL_DNS_IPS,
       time,
       false,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index bad56aa..e58c9f4 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -517,7 +517,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           config.requestTimeoutMs,
-          ClientDnsLookup.DEFAULT,
+          ClientDnsLookup.USE_ALL_DNS_IPS,
           time,
           false,
           new ApiVersions,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 2d847e0..5e15035 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -89,7 +89,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       brokerConfig.replicaSocketReceiveBufferBytes,
       brokerConfig.requestTimeoutMs,
-      ClientDnsLookup.DEFAULT,
+      ClientDnsLookup.USE_ALL_DNS_IPS,
       time,
       false,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index bfb03da..22d33e6 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -479,7 +479,7 @@ private class ReplicaFetcherBlockingSend(sourceNode: Node,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
       consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
-      ClientDnsLookup.DEFAULT,
+      ClientDnsLookup.USE_ALL_DNS_IPS,
       time,
       false,
       new ApiVersions,
diff --git a/docs/upgrade.html b/docs/upgrade.html
index be1e342..2983cf7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -29,6 +29,12 @@
         both support it and fallback to TLSv1.2 otherwise. See
         <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-573%3A+Enable+TLSv1.3+by+default">KIP-573</a>
for more details.
     </li>
+    <li>The default value for the <code>client.dns.lookup</code> configuration
has been changed from <code>default</code>
+        to <code>use_all_dns_ips</code>. If a hostname resolves to multiple IP
addresses, clients and brokers will now
+        attempt to connect to each IP in sequence until the connection is successfully established.
See
+        <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-602%3A+Change+default+value+for+client.dns.lookup">KIP-602</a>
+        for more details.
+    </li>
 </ul>
 
 <h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in
2.5.0</a></h5>


Mime
View raw message