kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-6195: Resolve DNS aliases in bootstrap.server (KIP-235) (#4485)
Date Sat, 13 Oct 2018 20:56:13 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 14ce380  KAFKA-6195: Resolve DNS aliases in bootstrap.server (KIP-235) (#4485)
14ce380 is described below

commit 14ce380749ed434bf5e07ba2233e182b9326b27e
Author: jonathanskrzypek <jonathan.skrzypek@gs.com>
AuthorDate: Sat Oct 13 21:39:35 2018 +0100

    KAFKA-6195: Resolve DNS aliases in bootstrap.server (KIP-235) (#4485)
    
    Adds `client.dns.lookup=resolve_canonical_bootstrap_servers_only` option to perform full dns resolution of bootstrap addresses
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Sriharsha Chintalapani <sriharsha@apache.org>, Edoardo Comar <ecomar@uk.ibm.com>, Mickael Maison <mickael.maison@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../config => clients}/ClientDnsLookup.java        |  5 ++-
 .../java/org/apache/kafka/clients/ClientUtils.java | 32 +++++++++++----
 .../kafka/clients/ClusterConnectionStates.java     |  1 -
 .../apache/kafka/clients/CommonClientConfigs.java  |  8 ++--
 .../org/apache/kafka/clients/NetworkClient.java    |  1 -
 .../kafka/clients/admin/AdminClientConfig.java     | 16 ++++++--
 .../kafka/clients/admin/KafkaAdminClient.java      |  8 ++--
 .../kafka/clients/consumer/ConsumerConfig.java     | 21 ++++++----
 .../kafka/clients/consumer/KafkaConsumer.java      | 11 ++---
 .../kafka/clients/producer/KafkaProducer.java      | 11 +++--
 .../kafka/clients/producer/ProducerConfig.java     | 21 ++++++----
 .../org/apache/kafka/clients/ClientUtilsTest.java  | 47 ++++++++++++++++------
 .../kafka/clients/ClusterConnectionStatesTest.java |  1 -
 .../apache/kafka/clients/NetworkClientTest.java    |  1 -
 .../clients/consumer/internals/FetcherTest.java    |  4 +-
 .../clients/producer/internals/SenderTest.java     |  2 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java | 21 ++++++----
 .../runtime/distributed/WorkerGroupMember.java     |  6 ++-
 core/src/main/scala/kafka/admin/AdminClient.scala  | 13 +++++-
 .../controller/ControllerChannelManager.scala      |  2 -
 .../TransactionMarkerChannelManager.scala          |  1 -
 core/src/main/scala/kafka/server/KafkaServer.scala |  3 +-
 .../kafka/server/ReplicaFetcherBlockingSend.scala  |  1 -
 .../kafka/tools/ReplicaVerificationTool.scala      |  1 -
 .../trogdor/workload/ConnectionStressWorker.java   |  8 ++--
 25 files changed, 158 insertions(+), 88 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
similarity index 88%
rename from clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
rename to clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
index 4a013b9..96d47c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
@@ -14,14 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.config;
+package org.apache.kafka.clients;
 
 import java.util.Locale;
 
 public enum ClientDnsLookup {
 
     DEFAULT("default"),
-    USE_ALL_DNS_IPS("use_all_dns_ips");
+    USE_ALL_DNS_IPS("use_all_dns_ips"),
+    RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY("resolve_canonical_bootstrap_servers_only");
 
     private String clientDnsLookup;
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 1661ea3..dce5f3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -42,10 +41,12 @@ import static org.apache.kafka.common.utils.Utils.getPort;
 public final class ClientUtils {
     private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
 
-    private ClientUtils() {}
+    private ClientUtils() {
+    }
 
-    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookup) {
         List<InetSocketAddress> addresses = new ArrayList<>();
+        ClientDnsLookup clientDnsLookupBehaviour = ClientDnsLookup.forConfig(clientDnsLookup);
         for (String url : urls) {
             if (url != null && !url.isEmpty()) {
                 try {
@@ -54,15 +55,30 @@ public final class ClientUtils {
                     if (host == null || port == null)
                         throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
 
-                    InetSocketAddress address = new InetSocketAddress(host, port);
-
-                    if (address.isUnresolved()) {
-                        log.warn("Removing server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
+                    if (clientDnsLookupBehaviour == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+                        InetAddress[] inetAddresses = InetAddress.getAllByName(host);
+                        for (InetAddress inetAddress : inetAddresses) {
+                            String resolvedCanonicalName = inetAddress.getCanonicalHostName();
+                            InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
+                            if (address.isUnresolved()) {
+                                log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname [} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
+                            } else {
+                                addresses.add(address);
+                            }
+                        }
                     } else {
-                        addresses.add(address);
+                        InetSocketAddress address = new InetSocketAddress(host, port);
+                        if (address.isUnresolved()) {
+                            log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
+                        } else {
+                            addresses.add(address);
+                        }
                     }
+
                 } catch (IllegalArgumentException e) {
                     throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                } catch (UnknownHostException e) {
+                    throw new ConfigException("Unknown host in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
                 }
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index b697de7..f198533 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients;
 
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 
 import java.net.InetAddress;
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index b179f02..c8e2357 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -41,6 +41,11 @@ public class CommonClientConfigs {
                                                        + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
                                                        + "servers (you may want more than one, though, in case a server is down).";
 
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
+    public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+            + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>"
+            + "<p>If the value is <code>resolve_canonical_bootstrap_servers_only</code> each entry will be resolved and expanded into a list of canonical names.</p>";
+
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
     public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
 
@@ -93,9 +98,6 @@ public class CommonClientConfigs {
                                                          + "elapses the client will resend the request if necessary or fail the request if "
                                                          + "retries are exhausted.";
 
-    public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
-    public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
-                                                        + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>";
 
     /**
      * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7ea05f6..c6f0c0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index a051de2..47c76ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -17,9 +17,9 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -44,6 +44,12 @@ public class AdminClientConfig extends AbstractConfig {
     private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
 
     /**
+     * <code>client.dns.lookup</code>
+     */
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+    private static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
+
+    /**
      * <code>reconnect.backoff.ms</code>
      */
     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
@@ -159,12 +165,14 @@ public class AdminClientConfig extends AbstractConfig {
                                         in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
                                         Importance.LOW,
                                         METRICS_RECORDING_LEVEL_DOC)
-                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                .define(CLIENT_DNS_LOOKUP_CONFIG,
                                         Type.STRING,
                                         ClientDnsLookup.DEFAULT.toString(),
-                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        in(ClientDnsLookup.DEFAULT.toString(),
+                                           ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                                           ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
                                         Importance.MEDIUM,
-                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                                        CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 86e1447..c8418c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -18,10 +18,10 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -46,7 +46,6 @@ import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -362,7 +361,7 @@ public class KafkaAdminClient extends AdminClient {
                 config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                 config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
                 (int) TimeUnit.HOURS.toMillis(1),
-                ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                ClientDnsLookup.forConfig(config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
@@ -414,7 +413,8 @@ public class KafkaAdminClient extends AdminClient {
         this.time = time;
         this.metadataManager = metadataManager;
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
-            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+            config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
         metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
         this.metrics = metrics;
         this.client = client;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index a1c9dc2..795a762 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -90,6 +90,9 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
+    /** <code>client.dns.lookup</code> */
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+
     /**
      * <code>enable.auto.commit</code>
      */
@@ -258,7 +261,7 @@ public class ConsumerConfig extends AbstractConfig {
             " return the LSO";
 
     public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
-    
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -266,6 +269,14 @@ public class ConsumerConfig extends AbstractConfig {
                                         new ConfigDef.NonNullValidator(),
                                         Importance.HIGH,
                                         CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                                .define(CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(),
+                                           ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                                           ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
                                 .define(SESSION_TIMEOUT_MS_CONFIG,
                                         Type.INT,
@@ -453,12 +464,6 @@ public class ConsumerConfig extends AbstractConfig {
                                         in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
                                         Importance.MEDIUM,
                                         ISOLATION_LEVEL_DOC)
-                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
-                                        Type.STRING,
-                                        ClientDnsLookup.DEFAULT.toString(),
-                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e79ff07..4061373 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -36,7 +36,6 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -710,8 +709,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                     true, false, clusterResourceListeners);
-            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0);
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+                    config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "consumer";
             ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
@@ -732,7 +733,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
-                    ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                    ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                     time,
                     true,
                     new ApiVersions(),
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 465e60f..c68a014 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -28,8 +28,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
@@ -49,7 +49,6 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -407,7 +406,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     apiVersions,
                     transactionManager,
                     new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
-            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
             if (metadata != null) {
                 this.metadata = metadata;
             } else {
@@ -449,7 +450,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                 producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                 requestTimeoutMs,
-                ClientDnsLookup.forConfig(producerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
@@ -496,7 +497,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
+
         TransactionManager transactionManager = null;
+
         boolean userConfiguredIdempotence = false;
         if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
             userConfiguredIdempotence = true;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f08159d..c63477d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -52,6 +52,9 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>bootstrap.servers</code> */
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
+    /** <code>client.dns.lookup</code> */
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+
     /** <code>metadata.max.age.ms</code> */
     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
     private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
@@ -239,6 +242,14 @@ public class ProducerConfig extends AbstractConfig {
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                                .define(CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(),
+                                           ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                                           ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                                 .define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                                 .define(ACKS_CONFIG,
@@ -347,13 +358,7 @@ public class ProducerConfig extends AbstractConfig {
                                         null,
                                         new ConfigDef.NonEmptyString(),
                                         Importance.LOW,
-                                        TRANSACTIONAL_ID_DOC)
-                                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
-                                        Type.STRING,
-                                        ClientDnsLookup.DEFAULT.toString(),
-                                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
+                                        TRANSACTIONAL_ID_DOC);
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index bea464f..35f52a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.junit.Test;
 import static org.junit.Assert.assertEquals;
@@ -31,27 +30,46 @@ import java.util.List;
 
 public class ClientUtilsTest {
 
+
     @Test
-    public void testParseAndValidateAddresses() {
-        check("127.0.0.1:8000");
-        check("mydomain.com:8080");
-        check("[::1]:8000");
-        check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
-        List<InetSocketAddress> validatedAddresses = check("some.invalid.hostname.foo.bar.local:9999", "mydomain.com:10000");
+    public void testParseAndValidateAddresses() throws UnknownHostException {
+        checkWithoutLookup("127.0.0.1:8000");
+        checkWithoutLookup("localhost:8080");
+        checkWithoutLookup("[::1]:8000");
+        checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000");
+        List<InetSocketAddress> validatedAddresses = checkWithoutLookup("localhost:10000");
         assertEquals(1, validatedAddresses.size());
         InetSocketAddress onlyAddress = validatedAddresses.get(0);
-        assertEquals("mydomain.com", onlyAddress.getHostName());
+        assertEquals("localhost", onlyAddress.getHostName());
         assertEquals(10000, onlyAddress.getPort());
     }
 
+    @Test
+    public void testParseAndValidateAddressesWithReverseLookup() {
+        checkWithoutLookup("127.0.0.1:8000");
+        checkWithoutLookup("localhost:8080");
+        checkWithoutLookup("[::1]:8000");
+        checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000");
+        List<InetSocketAddress> validatedAddresses = checkWithLookup(Arrays.asList("example.com:10000"));
+        assertEquals(2, validatedAddresses.size());
+        InetSocketAddress address = validatedAddresses.get(0);
+        assertEquals("93.184.216.34", address.getHostName());
+        assertEquals(10000, address.getPort());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidConfig() {
+        ClientUtils.parseAndValidateAddresses(Arrays.asList("localhost:10000"), "random.value");
+    }
+
     @Test(expected = ConfigException.class)
     public void testNoPort() {
-        check("127.0.0.1");
+        checkWithoutLookup("127.0.0.1");
     }
 
     @Test(expected = ConfigException.class)
     public void testOnlyBadHostname() {
-        check("some.invalid.hostname.foo.bar.local:9999");
+        checkWithoutLookup("some.invalid.hostname.foo.bar.local:9999");
     }
 
     @Test
@@ -87,7 +105,12 @@ public class ClientUtilsTest {
         assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size());
     }
 
-    private List<InetSocketAddress> check(String... url) {
-        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+    private List<InetSocketAddress> checkWithoutLookup(String... url) {
+        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT.toString());
+    }
+
+    private List<InetSocketAddress> checkWithLookup(List<String> url) {
+        return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString());
     }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index d4a2a55..23edaa9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.Before;
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index aaf827f..8abe9a40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 42deee7..94d8d5b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.FetchSessionHandler;
@@ -36,7 +37,6 @@ import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -2735,7 +2735,7 @@ public class FetcherTest {
         String topicName2 = "topic2";
         TopicPartition t2p0 = new TopicPartition(topicName2, 0);
         // Expect a metadata refresh.
-        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
+        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.DEFAULT.toString())),
                         Collections.<String>emptySet(),
                         time.milliseconds());
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 6d4d78c..23ca2ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
@@ -44,7 +45,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index aac9fb2..be3a709 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -61,6 +61,9 @@ public class WorkerConfig extends AbstractConfig {
             + "than one, though, in case a server is down).";
     public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
 
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+    public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
+
     public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
     public static final String KEY_CONVERTER_CLASS_DOC =
             "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@@ -223,6 +226,14 @@ public class WorkerConfig extends AbstractConfig {
         return new ConfigDef()
                 .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
                         Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
+                .define(CLIENT_DNS_LOOKUP_CONFIG,
+                        Type.STRING,
+                        ClientDnsLookup.DEFAULT.toString(),
+                        in(ClientDnsLookup.DEFAULT.toString(),
+                           ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                           ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+                        Importance.MEDIUM,
+                        CLIENT_DNS_LOOKUP_DOC)
                 .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
                 .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
@@ -278,13 +289,7 @@ public class WorkerConfig extends AbstractConfig {
                         Collections.emptyList(),
                         Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
-                        Importance.LOW, REST_EXTENSION_CLASSES_DOC)
-                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
-                        Type.STRING,
-                        ClientDnsLookup.DEFAULT.toString(),
-                        in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
-                        Importance.MEDIUM,
-                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);    
+                        Importance.LOW, REST_EXTENSION_CLASSES_DOC);
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index de8e8b2..5725ff5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
@@ -24,7 +25,6 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -96,7 +96,9 @@ public class WorkerGroupMember {
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
-            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+                    config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                    config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 5876b6e..aaa0903 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -23,6 +23,7 @@ import kafka.coordinator.group.GroupOverview
 import kafka.utils.Logging
 import org.apache.kafka.clients._
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.ValidString._
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
@@ -39,7 +40,6 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
-import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
   * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -386,6 +386,14 @@ object AdminClient {
         Type.LIST,
         Importance.HIGH,
         CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+      .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+        Type.STRING,
+        ClientDnsLookup.DEFAULT.toString,
+        in(ClientDnsLookup.DEFAULT.toString,
+           ClientDnsLookup.USE_ALL_DNS_IPS.toString,
+           ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
+        Importance.MEDIUM,
+        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
       .define(
         CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
         ConfigDef.Type.STRING,
@@ -429,7 +437,8 @@ object AdminClient {
     val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
 
     val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
-    val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
+    val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
+    val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
     val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
     metadata.update(bootstrapCluster, Collections.emptySet(), 0)
 
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 86b6f94..7002219 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -39,8 +39,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.collection.{Set, mutable}
-import org.apache.kafka.common.config.ClientDnsLookup
-
 
 object ControllerChannelManager {
   val QueueSizeMetricName = "QueueSize"
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index d0e765c..bd25d94 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -36,7 +36,6 @@ import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQue
 
 import collection.JavaConverters._
 import scala.collection.{concurrent, immutable}
-import org.apache.kafka.common.config.ClientDnsLookup
 
 object TransactionMarkerChannelManager {
   def apply(config: KafkaConfig,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 841ea82..bef0663 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import kafka.zk.{BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
@@ -51,7 +51,6 @@ import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, mutable}
-import org.apache.kafka.common.config.ClientDnsLookup
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index d15fdae..4e642f3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.common.Node
 import org.apache.kafka.common.requests.AbstractRequest.Builder
 
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.config.ClientDnsLookup
 
 trait BlockingSend {
 
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 7871015..1f87d7a 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
  * For verifying the consistency among replicas.
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 74ab234..9f15696 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -20,8 +20,8 @@ package org.apache.kafka.trogdor.workload;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.ManualMetadataUpdater;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -127,7 +126,8 @@ public class ConnectionStressWorker implements TaskWorker {
                 WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf());
                 AdminClientConfig conf = new AdminClientConfig(props);
                 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
-                    conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+                        conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                        conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
                 ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
                 while (true) {
                     if (doneFuture.isDone()) {
@@ -182,7 +182,7 @@ public class ConnectionStressWorker implements TaskWorker {
                                     4096,
                                     4096,
                                     1000,
-                                    ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                                    ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                                     Time.SYSTEM,
                                     false,
                                     new ApiVersions(),


Mime
View raw message