kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6863 Kafka clients should try to use multiple DNS resolved IP (#4987)
Date Thu, 11 Oct 2018 17:14:27 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f393b2f  KAFKA-6863 Kafka clients should try to use multiple DNS resolved IP (#4987)
f393b2f is described below

commit f393b2f7dd477c3a43e70631f7036a211bf5d740
Author: Edoardo Comar <ecomar@uk.ibm.com>
AuthorDate: Thu Oct 11 18:14:17 2018 +0100

    KAFKA-6863 Kafka clients should try to use multiple DNS resolved IP (#4987)
    
    Implementation of KIP-302: Based on the new client configuration `client.dns.lookup`, a NetworkClient can use InetAddress.getAllByName to find all IPs and iterate over them when they fail to connect. Only uses either IPv4 or IPv6 addresses similar to the default mode.
    
    Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
    Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../java/org/apache/kafka/clients/ClientUtils.java | 27 +++++++
 .../kafka/clients/ClusterConnectionStates.java     | 37 ++++++++--
 .../apache/kafka/clients/CommonClientConfigs.java  |  4 ++
 .../org/apache/kafka/clients/NetworkClient.java    | 23 ++++--
 .../kafka/clients/admin/AdminClientConfig.java     |  7 ++
 .../kafka/clients/admin/KafkaAdminClient.java      |  3 +
 .../kafka/clients/consumer/ConsumerConfig.java     |  7 ++
 .../kafka/clients/consumer/KafkaConsumer.java      |  3 +
 .../kafka/clients/producer/KafkaProducer.java      |  3 +
 .../kafka/clients/producer/ProducerConfig.java     |  9 ++-
 .../kafka/common/config/ClientDnsLookup.java       | 40 +++++++++++
 .../org/apache/kafka/clients/ClientUtilsTest.java  | 38 ++++++++++
 .../kafka/clients/ClusterConnectionStatesTest.java | 82 +++++++++++++++++-----
 .../apache/kafka/clients/NetworkClientTest.java    |  8 ++-
 .../clients/consumer/internals/FetcherTest.java    |  3 +-
 .../clients/producer/internals/SenderTest.java     |  3 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java |  9 ++-
 .../runtime/distributed/WorkerGroupMember.java     |  2 +
 core/src/main/scala/kafka/admin/AdminClient.scala  |  2 +
 .../controller/ControllerChannelManager.scala      |  2 +
 .../TransactionMarkerChannelManager.scala          |  2 +
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +
 .../kafka/server/ReplicaFetcherBlockingSend.scala  |  2 +
 .../kafka/tools/ReplicaVerificationTool.scala      |  2 +
 .../trogdor/workload/ConnectionStressWorker.java   |  3 +
 25 files changed, 289 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index fba220c..1661ea3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -27,8 +28,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -88,4 +92,27 @@ public final class ClientUtils {
         return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
                 clientSaslMechanism, true);
     }
+
+    static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
+        InetAddress[] addresses = InetAddress.getAllByName(host);
+        if (ClientDnsLookup.USE_ALL_DNS_IPS == clientDnsLookup) {
+            return filterPreferredAddresses(addresses);
+        } else {
+            return Collections.singletonList(addresses[0]);
+        }
+    }
+
+    static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) {
+        List<InetAddress> preferredAddresses = new ArrayList<>();
+        Class<? extends InetAddress> clazz = null;
+        for (InetAddress address : allAddresses) {
+            if (clazz == null) {
+                clazz = address.getClass();
+            }
+            if (clazz.isInstance(address)) {
+                preferredAddresses.add(address);
+            }
+        }
+        return preferredAddresses;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index c3a2856..b697de7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -18,9 +18,13 @@ package org.apache.kafka.clients;
 
 import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -104,18 +108,24 @@ final class ClusterConnectionStates {
      * Enter the connecting state for the given connection.
      * @param id the id of the connection
      * @param now the current time
+     * @throws UnknownHostException 
      */
-    public void connecting(String id, long now) {
+    public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
         if (nodeState.containsKey(id)) {
-            NodeConnectionState node = nodeState.get(id);
-            node.lastConnectAttemptMs = now;
-            node.state = ConnectionState.CONNECTING;
+            NodeConnectionState connectionState = nodeState.get(id);
+            connectionState.lastConnectAttemptMs = now;
+            connectionState.state = ConnectionState.CONNECTING;
+            connectionState.moveToNextAddress();
         } else {
             nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
-                this.reconnectBackoffInitMs));
+                this.reconnectBackoffInitMs, ClientUtils.resolve(host, clientDnsLookup)));
         }
     }
 
+    public InetAddress currentAddress(String id) {
+        return nodeState.get(id).currentAddress();
+    }
+
     /**
      * Enter the disconnected state for the given node.
      * @param id the connection we have disconnected
@@ -334,9 +344,13 @@ final class ClusterConnectionStates {
         long reconnectBackoffMs;
         // Connection is being throttled if current time < throttleUntilTimeMs.
         long throttleUntilTimeMs;
+        private final List<InetAddress> addresses;
+        private int index = 0;
 
-        public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs) {
+        public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs, 
+                List<InetAddress> addresses) {
             this.state = state;
+            this.addresses = addresses;
             this.authenticationException = null;
             this.lastConnectAttemptMs = lastConnectAttempt;
             this.failedAttempts = 0;
@@ -344,6 +358,17 @@ final class ClusterConnectionStates {
             this.throttleUntilTimeMs = 0;
         }
 
+        public InetAddress currentAddress() {
+            return addresses.get(index);
+        }
+
+        /*
+         * implementing a ring buffer with the addresses
+         */
+        public void moveToNextAddress() {
+            index = (index + 1) % addresses.size();
+        }
+
         public String toString() {
             return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index a08e5b1..b179f02 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -93,6 +93,10 @@ public class CommonClientConfigs {
                                                          + "elapses the client will resend the request if necessary or fail the request if "
                                                          + "retries are exhausted.";
 
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
+    public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+                                                        + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>";
+
     /**
      * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
      * is explicitly configured but the maximum reconnect backoff is not explicitly configured.
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8ec51ed..7ea05f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
@@ -44,6 +45,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -96,6 +98,8 @@ public class NetworkClient implements KafkaClient {
     /* time in ms to wait before retrying to create connection to a server */
     private final long reconnectBackoffMs;
 
+    private final ClientDnsLookup clientDnsLookup;
+
     private final Time time;
 
     /**
@@ -120,6 +124,7 @@ public class NetworkClient implements KafkaClient {
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int defaultRequestTimeoutMs,
+                         ClientDnsLookup clientDnsLookup,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
@@ -134,6 +139,7 @@ public class NetworkClient implements KafkaClient {
              socketSendBuffer,
              socketReceiveBuffer,
              defaultRequestTimeoutMs,
+             clientDnsLookup,
              time,
              discoverBrokerVersions,
              apiVersions,
@@ -150,6 +156,7 @@ public class NetworkClient implements KafkaClient {
             int socketSendBuffer,
             int socketReceiveBuffer,
             int defaultRequestTimeoutMs,
+            ClientDnsLookup clientDnsLookup,
             Time time,
             boolean discoverBrokerVersions,
             ApiVersions apiVersions,
@@ -165,6 +172,7 @@ public class NetworkClient implements KafkaClient {
              socketSendBuffer,
              socketReceiveBuffer,
              defaultRequestTimeoutMs,
+             clientDnsLookup,
              time,
              discoverBrokerVersions,
              apiVersions,
@@ -181,6 +189,7 @@ public class NetworkClient implements KafkaClient {
                          int socketSendBuffer,
                          int socketReceiveBuffer,
                          int defaultRequestTimeoutMs,
+                         ClientDnsLookup clientDnsLookup,
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
@@ -195,6 +204,7 @@ public class NetworkClient implements KafkaClient {
              socketSendBuffer,
              socketReceiveBuffer,
              defaultRequestTimeoutMs,
+             clientDnsLookup,
              time,
              discoverBrokerVersions,
              apiVersions,
@@ -212,6 +222,7 @@ public class NetworkClient implements KafkaClient {
                           int socketSendBuffer,
                           int socketReceiveBuffer,
                           int defaultRequestTimeoutMs,
+                          ClientDnsLookup clientDnsLookup,
                           Time time,
                           boolean discoverBrokerVersions,
                           ApiVersions apiVersions,
@@ -243,6 +254,7 @@ public class NetworkClient implements KafkaClient {
         this.apiVersions = apiVersions;
         this.throttleTimeSensor = throttleTimeSensor;
         this.log = logContext.logger(NetworkClient.class);
+        this.clientDnsLookup = clientDnsLookup;
     }
 
     /**
@@ -862,12 +874,13 @@ public class NetworkClient implements KafkaClient {
     private void initiateConnect(Node node, long now) {
         String nodeConnectionId = node.idString();
         try {
-            log.debug("Initiating connection to node {}", node);
-            this.connectionStates.connecting(nodeConnectionId, now);
+            this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
+            InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);
+            log.debug("Initiating connection to node {} using address {}", node, address);
             selector.connect(nodeConnectionId,
-                             new InetSocketAddress(node.host(), node.port()),
-                             this.socketSendBuffer,
-                             this.socketReceiveBuffer);
+                    new InetSocketAddress(address, node.port()),
+                    this.socketSendBuffer,
+                    this.socketReceiveBuffer);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
             connectionStates.disconnected(nodeConnectionId, now);
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 058c491..a051de2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -158,6 +159,12 @@ public class AdminClientConfig extends AbstractConfig {
                                         in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
                                         Importance.LOW,
                                         METRICS_RECORDING_LEVEL_DOC)
+                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ceebc58..86e1447 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -45,6 +46,7 @@ import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -360,6 +362,7 @@ public class KafkaAdminClient extends AdminClient {
                 config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                 config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
                 (int) TimeUnit.HOURS.toMillis(1),
+                ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ddd6e06..a1c9dc2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -452,6 +453,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
                                         Importance.MEDIUM,
                                         ISOLATION_LEVEL_DOC)
+                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 04b8ec2..e79ff07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -35,6 +36,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -730,6 +732,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                    ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                     time,
                     true,
                     new ApiVersions(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 316b024..465e60f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
@@ -48,6 +49,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -447,6 +449,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                 producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                 requestTimeoutMs,
+                ClientDnsLookup.forConfig(producerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 6142519..f08159d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -346,7 +347,13 @@ public class ProducerConfig extends AbstractConfig {
                                         null,
                                         new ConfigDef.NonEmptyString(),
                                         Importance.LOW,
-                                        TRANSACTIONAL_ID_DOC);
+                                        TRANSACTIONAL_ID_DOC)
+                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java b/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
new file mode 100644
index 0000000..4a013b9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.Locale;
+
+public enum ClientDnsLookup {
+
+    DEFAULT("default"),
+    USE_ALL_DNS_IPS("use_all_dns_ips");
+
+    private String clientDnsLookup;
+
+    ClientDnsLookup(String clientDnsLookup) {
+        this.clientDnsLookup = clientDnsLookup;
+    }
+
+    @Override
+    public String toString() {
+        return clientDnsLookup;
+    }
+
+    public static ClientDnsLookup forConfig(String config) {
+        return ClientDnsLookup.valueOf(config.toUpperCase(Locale.ROOT));
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index d19b0be..bea464f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.junit.Test;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
 
@@ -49,6 +54,39 @@ public class ClientUtilsTest {
         check("some.invalid.hostname.foo.bar.local:9999");
     }
 
+    @Test
+    public void testFilterPreferredAddresses() throws UnknownHostException {
+        InetAddress ipv4 = InetAddress.getByName("192.0.0.1");
+        InetAddress ipv6 = InetAddress.getByName("::1");
+
+        InetAddress[] ipv4First = new InetAddress[]{ipv4, ipv6, ipv4};
+        List<InetAddress> result = ClientUtils.filterPreferredAddresses(ipv4First);
+        assertTrue(result.contains(ipv4));
+        assertFalse(result.contains(ipv6));
+        assertEquals(2, result.size());
+
+        InetAddress[] ipv6First = new InetAddress[]{ipv6, ipv4, ipv4};
+        result = ClientUtils.filterPreferredAddresses(ipv6First);
+        assertTrue(result.contains(ipv6));
+        assertFalse(result.contains(ipv4));
+        assertEquals(1, result.size());
+    }
+
+    @Test(expected = UnknownHostException.class)
+    public void testResolveUnknownHostException() throws UnknownHostException {
+        ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.DEFAULT);
+    }
+
+    @Test
+    public void testResolveDnsLookup() throws UnknownHostException {
+        assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.DEFAULT).size());
+    }
+
+    @Test
+    public void testResolveDnsLookupAllIps() throws UnknownHostException {
+        assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size());
+    }
+
     private List<InetSocketAddress> check(String... url) {
         return ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 37155ce..d4a2a55 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -19,9 +19,15 @@ package org.apache.kafka.clients;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.Before;
@@ -35,6 +41,7 @@ public class ClusterConnectionStatesTest {
     private final double reconnectBackoffJitter = 0.2;
     private final String nodeId1 = "1001";
     private final String nodeId2 = "2002";
+    private final String hostTwoIps = "kafka.apache.org";
 
     private ClusterConnectionStates connectionStates;
 
@@ -44,11 +51,11 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testClusterConnectionStateChanges() {
+    public void testClusterConnectionStateChanges() throws UnknownHostException {
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
 
         // Start connecting to Node and check state
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.CONNECTING);
         assertTrue(connectionStates.isConnecting(nodeId1));
         assertFalse(connectionStates.isReady(nodeId1, time.milliseconds()));
@@ -88,7 +95,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testMultipleNodeConnectionStates() {
+    public void testMultipleNodeConnectionStates() throws UnknownHostException {
         // Check initial state, allowed to connect to all nodes, but no nodes shown as ready
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
         assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds()));
@@ -96,7 +103,7 @@ public class ClusterConnectionStatesTest {
 
         // Start connecting one node and check that the pool only shows ready nodes after
         // successful connect
-        connectionStates.connecting(nodeId2, time.milliseconds());
+        connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
         time.sleep(1000);
         connectionStates.ready(nodeId2);
@@ -104,7 +111,7 @@ public class ClusterConnectionStatesTest {
 
         // Connect second node and check that both are shown as ready, pool should immediately
         // show ready nodes, since node2 is already connected
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
         time.sleep(1000);
         connectionStates.ready(nodeId1);
@@ -126,9 +133,9 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testAuthorizationFailed() {
+    public void testAuthorizationFailed() throws UnknownHostException {
         // Try connecting
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
 
         time.sleep(100);
 
@@ -147,8 +154,8 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testRemoveNode() {
-        connectionStates.connecting(nodeId1, time.milliseconds());
+    public void testRemoveNode() throws UnknownHostException {
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.ready(nodeId1);
         time.sleep(10000);
@@ -162,9 +169,9 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testMaxReconnectBackoff() {
+    public void testMaxReconnectBackoff() throws UnknownHostException {
         long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter));
-        connectionStates.connecting(nodeId1, time.milliseconds());
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.disconnected(nodeId1, time.milliseconds());
 
@@ -175,14 +182,14 @@ public class ClusterConnectionStatesTest {
             assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds()));
             time.sleep(reconnectBackoff + 1);
             assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
-            connectionStates.connecting(nodeId1, time.milliseconds());
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
             time.sleep(10);
             connectionStates.disconnected(nodeId1, time.milliseconds());
         }
     }
 
     @Test
-    public void testExponentialReconnectBackoff() {
+    public void testExponentialReconnectBackoff() throws UnknownHostException {
         // Calculate fixed components for backoff process
         final int reconnectBackoffExpBase = 2;
         double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
@@ -190,7 +197,7 @@ public class ClusterConnectionStatesTest {
 
         // Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
         for (int i = 0; i < 10; i++) {
-            connectionStates.connecting(nodeId1, time.milliseconds());
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
             connectionStates.disconnected(nodeId1, time.milliseconds());
             // Calculate expected backoff value without jitter
             long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
@@ -202,8 +209,8 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testThrottled() {
-        connectionStates.connecting(nodeId1, time.milliseconds());
+    public void testThrottled() throws UnknownHostException {
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.ready(nodeId1);
         time.sleep(10000);
@@ -226,4 +233,47 @@ public class ClusterConnectionStatesTest {
         assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()),
             connectionStates.pollDelayMs(nodeId1, time.milliseconds()));
     }
+
+    @Test
+    public void testSingleIPWithDefault() throws UnknownHostException {
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        InetAddress currAddress = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        assertSame(currAddress, connectionStates.currentAddress(nodeId1));
+    }
+
+    @Test
+    public void testSingleIPWithUseAll() throws UnknownHostException {
+        assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS).size());
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress currAddress = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS);
+        assertSame(currAddress, connectionStates.currentAddress(nodeId1));
+    }
+
+    @Test
+    public void testMultipleIPsWithDefault() throws UnknownHostException {
+        assertEquals(2, ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size());
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
+        InetAddress currAddress = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
+        assertSame(currAddress, connectionStates.currentAddress(nodeId1));
+    }
+
+    @Test
+    public void testMultipleIPsWithUseAll() throws UnknownHostException {
+        assertEquals(2, ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size());
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress addr1 = connectionStates.currentAddress(nodeId1);
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress addr2 = connectionStates.currentAddress(nodeId1);
+        assertNotSame(addr1, addr2);
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
+        InetAddress addr3 = connectionStates.currentAddress(nodeId1);
+        assertSame(addr1, addr3);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 2876570..aaf827f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
@@ -70,19 +71,20 @@ public class NetworkClientTest {
     private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
-                defaultRequestTimeoutMs, time, true, new ApiVersions(), new LogContext());
+                defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
                 "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
-                time, true, new ApiVersions(), new LogContext());
+                ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
-                64 * 1024, 64 * 1024, defaultRequestTimeoutMs, time, false, new ApiVersions(), new LogContext());
+                64 * 1024, 64 * 1024, defaultRequestTimeoutMs, 
+                ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
     }
 
     @Before
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 42f6beb..42deee7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -1547,7 +1548,7 @@ public class FetcherTest {
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                1000, 1000, 64 * 1024, 64 * 1024, 1000,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.DEFAULT,
                 time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index f2a34f6..6d4d78c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
@@ -262,7 +263,7 @@ public class SenderTest {
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                1000, 1000, 64 * 1024, 64 * 1024, 1000,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,  ClientDnsLookup.DEFAULT,
                 time, true, new ApiVersions(), throttleTimeSensor, logContext);
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index c5baaa4..aac9fb2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -277,7 +278,13 @@ public class WorkerConfig extends AbstractConfig {
                         Collections.emptyList(),
                         Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
-                        Importance.LOW, REST_EXTENSION_CLASSES_DOC);
+                        Importance.LOW, REST_EXTENSION_CLASSES_DOC)
+                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                        Type.STRING,
+                        ClientDnsLookup.DEFAULT.toString(),
+                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                        Importance.MEDIUM,
+                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);    
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index aeed060..de8e8b2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -109,6 +110,7 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
+                    ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                     time,
                     true,
                     new ApiVersions(),
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 239844d..5876b6e 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
   * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -452,6 +453,7 @@ object AdminClient {
       DefaultSendBufferBytes,
       DefaultReceiveBufferBytes,
       requestTimeoutMs,
+      ClientDnsLookup.DEFAULT,
       time,
       true,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecf6fbf..86b6f94 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.collection.{Set, mutable}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 
 object ControllerChannelManager {
@@ -140,6 +141,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         config.requestTimeoutMs,
+        ClientDnsLookup.DEFAULT,
         time,
         false,
         new ApiVersions,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index f8b56e8..d0e765c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -36,6 +36,7 @@ import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQue
 
 import collection.JavaConverters._
 import scala.collection.{concurrent, immutable}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 object TransactionMarkerChannelManager {
   def apply(config: KafkaConfig,
@@ -74,6 +75,7 @@ object TransactionMarkerChannelManager {
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       config.socketReceiveBufferBytes,
       config.requestTimeoutMs,
+      ClientDnsLookup.DEFAULT,
       time,
       false,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0d13d9e..841ea82 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -51,6 +51,7 @@ import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, mutable}
+import org.apache.kafka.common.config.ClientDnsLookup
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -443,6 +444,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           config.requestTimeoutMs,
+          ClientDnsLookup.DEFAULT,
           time,
           false,
           new ApiVersions,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 4c7adfb..d15fdae 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node
 import org.apache.kafka.common.requests.AbstractRequest.Builder
 
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.config.ClientDnsLookup
 
 trait BlockingSend {
 
@@ -79,6 +80,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       brokerConfig.replicaSocketReceiveBufferBytes,
       brokerConfig.requestTimeoutMs,
+      ClientDnsLookup.DEFAULT,
       time,
       false,
       new ApiVersions,
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 4758764..7871015 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
  * For verifying the consistency among replicas.
@@ -472,6 +473,7 @@ private class ReplicaFetcherBlockingSend(sourceNode: Node,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
       consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+      ClientDnsLookup.DEFAULT,
       time,
       false,
       new ApiVersions,
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 82d1d6c..74ab234 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.ManualMetadataUpdater;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
@@ -29,6 +30,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -180,6 +182,7 @@ public class ConnectionStressWorker implements TaskWorker {
                                     4096,
                                     4096,
                                     1000,
+                                    ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
                                     Time.SYSTEM,
                                     false,
                                     new ApiVersions(),


Mime
View raw message